diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ecadf53c4cd..61400a5379c 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -369,6 +369,15 @@ For more information, see [Enabling Metrics](../configuration/index.md#enabling-
|`jvm/gc/count`|Garbage collection count|`gcName` (cms/g1/parallel/etc.), `gcGen` (old/young)|Varies|
|`jvm/gc/cpu`|Count of CPU time in Nanoseconds spent on garbage collection. Note: `jvm/gc/cpu` represents the total time over multiple GC cycles; divide by `jvm/gc/count` to get the mean GC time per cycle.|`gcName`, `gcGen`|Sum of `jvm/gc/cpu` should be within 10-30% of sum of `jvm/cpu/total`, depending on the GC algorithm used (reported by [`JvmCpuMonitor`](../configuration/index.md#enabling-metrics)). |
+### ZooKeeper
+
+These metrics are available unless `druid.zk.service.enabled = false`.
+
+|Metric|Description|Dimensions|Normal Value|
+|------|-----------|----------|------------|
+|`zk/connected`|Indicator of connection status. `1` for connected, `0` for disconnected. Emitted once per monitor period.|None|1|
+|`zk/reconnect/time`|Amount of time, in milliseconds, that a server was disconnected from ZooKeeper before reconnecting. Emitted on reconnection. Not emitted if connection to ZooKeeper is permanently lost, because in this case, there is no reconnection.|None|Not present|
+
### EventReceiverFirehose
The following metric is only available if the `EventReceiverFirehoseMonitor` module is included.
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 97297c085e2..47875e106f3 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -3440,7 +3440,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
serviceEmitter.getExceptionMessage()
);
- Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
+ Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass());
}
@Test
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 19caa96ead9..37691b1a7e5 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3461,7 +3461,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
serviceEmitter.getExceptionMessage()
);
- Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
+ Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass());
}
@Test
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
index 80d1dec0e66..7a2eae93e16 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
@@ -168,7 +168,11 @@ public class Initializer
JsonConfigProvider.bind(binder, MetadataStorageTablesConfig.PROPERTY_BASE, MetadataStorageTablesConfig.class);
// Build from properties provided in the config
- JsonConfigProvider.bind(binder, MetadataStorageConnectorConfig.PROPERTY_BASE, MetadataStorageConnectorConfig.class);
+ JsonConfigProvider.bind(
+ binder,
+ MetadataStorageConnectorConfig.PROPERTY_BASE,
+ MetadataStorageConnectorConfig.class
+ );
}
@Provides
@@ -327,7 +331,7 @@ public class Initializer
*
* The builder registers {@code DruidNodeDiscoveryProvider} by default: add any
* test-specific instances as needed.
- */
+ */
public Builder eagerInstance(Class> theClass)
{
this.eagerCreation.add(theClass);
@@ -343,7 +347,7 @@ public class Initializer
return this;
}
- public Builder modules(Module...modules)
+ public Builder modules(Module... modules)
{
return modules(Arrays.asList(modules));
}
diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
index 7ad5d1091de..6531c6bd3a8 100644
--- a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
+++ b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
@@ -29,17 +29,11 @@ import org.apache.druid.java.util.emitter.service.AlertBuilder;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
-import java.io.PrintWriter;
-import java.io.StringWriter;
/**
*/
public class EmittingLogger extends Logger
{
- public static final String EXCEPTION_TYPE_KEY = "exceptionType";
- public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
- public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
-
private static volatile ServiceEmitter emitter = null;
private final String className;
@@ -93,19 +87,8 @@ public class EmittingLogger extends Logger
throw e;
}
- final AlertBuilder retVal = new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter)
+ return new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter)
.addData("class", className);
-
- if (t != null) {
- final StringWriter trace = new StringWriter();
- final PrintWriter pw = new PrintWriter(trace);
- t.printStackTrace(pw);
- retVal.addData("exceptionType", t.getClass());
- retVal.addData("exceptionMessage", t.getMessage());
- retVal.addData("exceptionStackTrace", trace.toString());
- }
-
- return retVal;
}
public class EmittingAlertBuilder extends AlertBuilder
@@ -118,6 +101,7 @@ public class EmittingLogger extends Logger
{
super(description, emitter);
this.t = t;
+ addThrowable(t);
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java
index 9938e7cb7ae..352272d67bc 100644
--- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java
+++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java
@@ -24,12 +24,20 @@ import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
+import javax.annotation.Nullable;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.Map;
/**
-*/
+ *
+ */
public class AlertBuilder extends ServiceEventBuilder
{
+ public static final String EXCEPTION_TYPE_KEY = "exceptionType";
+ public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
+ public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
+
protected final Map dataMap = Maps.newLinkedHashMap();
protected final String description;
protected final ServiceEmitter emitter;
@@ -67,6 +75,20 @@ public class AlertBuilder extends ServiceEventBuilder
return this;
}
+ public AlertBuilder addThrowable(@Nullable final Throwable t)
+ {
+ if (t != null) {
+ final StringWriter trace = new StringWriter();
+ final PrintWriter pw = new PrintWriter(trace);
+ t.printStackTrace(pw);
+ addData(EXCEPTION_TYPE_KEY, t.getClass().getName());
+ addData(EXCEPTION_MESSAGE_KEY, t.getMessage());
+ addData(EXCEPTION_STACK_TRACE_KEY, trace.toString());
+ }
+
+ return this;
+ }
+
public AlertBuilder severity(AlertEvent.Severity severity)
{
this.severity = severity;
diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java
new file mode 100644
index 00000000000..275dc8be703
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.service;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class AlertBuilderTest
+{
+ @Test
+ public void testAlertBuilder()
+ {
+ final AlertEvent alertEvent =
+ AlertBuilder.create("alert[%s]", "oops")
+ .addData(ImmutableMap.of("foo", "bar"))
+ .addData(ImmutableMap.of("baz", "qux"))
+ .addThrowable(new RuntimeException("an exception!"))
+ .build("druid/test", "example.com");
+
+ final EventMap alertMap = alertEvent.toMap();
+
+ Assert.assertEquals("alerts", alertMap.get("feed"));
+ Assert.assertEquals("alert[oops]", alertMap.get("description"));
+ Assert.assertEquals("druid/test", alertMap.get("service"));
+ Assert.assertEquals("example.com", alertMap.get("host"));
+
+ final Map dataMap = (Map) alertMap.get("data");
+ Assert.assertEquals("java.lang.RuntimeException", dataMap.get("exceptionType"));
+ Assert.assertEquals("an exception!", dataMap.get("exceptionMessage"));
+ Assert.assertEquals("bar", dataMap.get("foo"));
+ Assert.assertEquals("qux", dataMap.get("baz"));
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java b/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java
index ae7a1dfde05..70c566b3f0e 100644
--- a/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java
+++ b/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java
@@ -19,6 +19,8 @@
package org.apache.druid.testing.junit;
+import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
@@ -28,8 +30,8 @@ import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.junit.rules.ExternalResource;
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* JUnit rule to capture a class's logger output to an in-memory buffer to allow verification of log messages in tests.
@@ -73,6 +75,14 @@ public class LoggerCaptureRule extends ExternalResource
inMemoryAppender.clearLogEvents();
}
+ /**
+ * Wait for the captured
+ */
+ public void awaitLogEvents() throws InterruptedException
+ {
+ inMemoryAppender.awaitLogEvents();
+ }
+
private static class InMemoryAppender extends AbstractAppender
{
static final String NAME = InMemoryAppender.class.getName();
@@ -80,32 +90,51 @@ public class LoggerCaptureRule extends ExternalResource
private final String targetLoggerName;
// logEvents has concurrent iteration and modification in CuratorModuleTest::exitsJvmWhenMaxRetriesExceeded(), needs to be thread safe
- private final CopyOnWriteArrayList logEvents;
+ @GuardedBy("logEvents")
+ private final List logEvents;
InMemoryAppender(Class> targetClass)
{
super(NAME, null, null);
targetLoggerName = targetClass.getName();
- logEvents = new CopyOnWriteArrayList<>();
+ logEvents = new ArrayList<>();
}
@Override
public void append(LogEvent logEvent)
{
- if (logEvent.getLoggerName().equals(targetLoggerName)) {
- logEvents.add(logEvent);
+ synchronized (logEvents) {
+ if (logEvent.getLoggerName().equals(targetLoggerName)) {
+ logEvents.add(logEvent);
+ logEvents.notifyAll();
+ }
}
}
List getLogEvents()
{
- return logEvents;
+ synchronized (logEvents) {
+ return ImmutableList.copyOf(logEvents);
+ }
}
void clearLogEvents()
{
- logEvents.clear();
+ synchronized (logEvents) {
+ logEvents.clear();
+ }
+ }
+
+ /**
+ * Wait for "logEvents" to be nonempty. If it is already nonempty, return immediately.
+ */
+ void awaitLogEvents() throws InterruptedException
+ {
+ synchronized (logEvents) {
+ while (logEvents.isEmpty()) {
+ logEvents.wait();
+ }
+ }
}
}
}
-
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java b/server/src/main/java/org/apache/druid/curator/CuratorModule.java
index 201c96bfdff..a7e76af474a 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java
@@ -36,6 +36,9 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.AlertBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.server.metrics.MetricsModule;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -58,7 +61,6 @@ public class CuratorModule implements Module
}
/**
- *
* @param haltOnFailedStart set to true if the JVM needs to be halted within 30 seconds of failed initialization
* due to unhandled curator exceptions.
*/
@@ -72,6 +74,7 @@ public class CuratorModule implements Module
{
JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, ZkEnablementConfig.class);
JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, CuratorConfig.class);
+ MetricsModule.register(binder, DruidConnectionStateListener.class);
}
/**
@@ -88,7 +91,8 @@ public class CuratorModule implements Module
);
}
- RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries());
+ final RetryPolicy retryPolicy =
+ new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries());
return builder
.ensembleProvider(new FixedEnsembleProvider(config.getZkHosts()))
@@ -105,7 +109,13 @@ public class CuratorModule implements Module
*/
@Provides
@LazySingleton
- public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, Lifecycle lifecycle)
+ public CuratorFramework makeCurator(
+ final ZkEnablementConfig zkEnablementConfig,
+ final CuratorConfig config,
+ final DruidConnectionStateListener connectionStateListener,
+ final ServiceEmitter emitter,
+ final Lifecycle lifecycle
+ )
{
if (!zkEnablementConfig.isEnabled()) {
throw new RuntimeException("Zookeeper is disabled, cannot create CuratorFramework.");
@@ -113,7 +123,34 @@ public class CuratorModule implements Module
final CuratorFramework framework = createCurator(config);
+ framework.getConnectionStateListenable().addListener(connectionStateListener);
+ addUnhandledErrorListener(framework, emitter, lifecycle);
+ addLifecycleHandler(framework, lifecycle);
+
+ return framework;
+ }
+
+ /**
+ * Provide an instance of {@link DruidConnectionStateListener} for monitoring connection state.
+ */
+ @Provides
+ @LazySingleton
+ public DruidConnectionStateListener makeConnectionStateListener(final ServiceEmitter emitter)
+ {
+ return new DruidConnectionStateListener(emitter);
+ }
+
+ /**
+ * Add unhandled error listener that shuts down the JVM.
+ */
+ private void addUnhandledErrorListener(
+ final CuratorFramework framework,
+ final ServiceEmitter emitter,
+ final Lifecycle lifecycle
+ )
+ {
framework.getUnhandledErrorListenable().addListener((message, e) -> {
+ emitter.emit(AlertBuilder.create("Unhandled Curator error").addThrowable(e));
log.error(e, "Unhandled error in Curator, stopping server.");
if (haltOnFailedStart) {
@@ -140,7 +177,13 @@ public class CuratorModule implements Module
shutdown(lifecycle);
});
+ }
+ /**
+ * Add unhandled error listener that shuts down the JVM.
+ */
+ private void addLifecycleHandler(final CuratorFramework framework, final Lifecycle lifecycle)
+ {
lifecycle.addHandler(
new Lifecycle.Handler()
{
@@ -159,23 +202,6 @@ public class CuratorModule implements Module
}
}
);
-
- return framework;
- }
-
- static class SecuredACLProvider implements ACLProvider
- {
- @Override
- public List getDefaultAcl()
- {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
-
- @Override
- public List getAclForPath(String path)
- {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
}
private void shutdown(Lifecycle lifecycle)
@@ -191,4 +217,19 @@ public class CuratorModule implements Module
System.exit(1);
}
}
+
+ private static class SecuredACLProvider implements ACLProvider
+ {
+ @Override
+ public List getDefaultAcl()
+ {
+ return ZooDefs.Ids.CREATOR_ALL_ACL;
+ }
+
+ @Override
+ public List getAclForPath(String path)
+ {
+ return ZooDefs.Ids.CREATOR_ALL_ACL;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java
new file mode 100644
index 00000000000..b0ba274d501
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator;
+
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.druid.java.util.emitter.service.AlertBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+
+/**
+ * Curator {@link ConnectionStateListener} that uses a {@link ServiceEmitter} to send alerts on ZK connection loss,
+ * and emit metrics about ZK connection status.
+ */
+public class DruidConnectionStateListener extends AbstractMonitor implements ConnectionStateListener
+{
+ private static final String METRIC_IS_CONNECTED = "zk/connected";
+ private static final String METRIC_RECONNECT_TIME = "zk/reconnect/time";
+ private static final int NIL = -1;
+
+ private final ServiceEmitter emitter;
+
+ /**
+ * Current connection state.
+ */
+ @GuardedBy("this")
+ private ConnectionState currentState;
+
+ /**
+ * Time given by {@link System#currentTimeMillis()} at last disconnect.
+ */
+ @GuardedBy("this")
+ private long lastDisconnectTime = NIL;
+
+ public DruidConnectionStateListener(final ServiceEmitter emitter)
+ {
+ this.emitter = emitter;
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState)
+ {
+ if (newState.isConnected()) {
+ final long disconnectDuration;
+
+ synchronized (this) {
+ if (lastDisconnectTime != NIL) {
+ disconnectDuration = Math.max(0, System.currentTimeMillis() - lastDisconnectTime);
+ } else {
+ disconnectDuration = NIL;
+ }
+
+ currentState = newState;
+ lastDisconnectTime = NIL;
+ }
+
+ if (disconnectDuration != NIL) {
+ emitter.emit(ServiceMetricEvent.builder().build(METRIC_RECONNECT_TIME, disconnectDuration));
+ }
+ } else {
+ synchronized (this) {
+ currentState = newState;
+ lastDisconnectTime = Math.max(lastDisconnectTime, System.currentTimeMillis());
+ }
+
+ emitter.emit(AlertBuilder.create("ZooKeeper connection[%s]", newState));
+ }
+ }
+
+ public boolean isConnected()
+ {
+ synchronized (this) {
+ return currentState != null && currentState.isConnected();
+ }
+ }
+
+ @Override
+ public boolean doMonitor(ServiceEmitter emitter)
+ {
+ emitter.emit(ServiceMetricEvent.builder().build(METRIC_IS_CONNECTED, isConnected() ? 1 : 0));
+ return true;
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java
index 5b88cb59f43..46257efc055 100644
--- a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java
+++ b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java
@@ -27,6 +27,8 @@ import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.junit.LoggerCaptureRule;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
@@ -69,23 +71,25 @@ public final class CuratorModuleTest
Assert.assertEquals(CuratorModule.MAX_SLEEP_TIME_MS, retryPolicy.getMaxSleepTimeMs());
}
- @Test
+ @Test(timeout = 60_000L)
public void exitsJvmWhenMaxRetriesExceeded() throws Exception
{
Properties props = new Properties();
props.setProperty(CURATOR_CONNECTION_TIMEOUT_MS_KEY, "0");
Injector injector = newInjector(props);
- CuratorFramework curatorFramework = createCuratorFramework(injector, 0);
- curatorFramework.start();
- exit.expectSystemExitWithStatus(1);
logger.clearLogEvents();
+ exit.expectSystemExitWithStatus(1);
// This will result in a curator unhandled error since the connection timeout is 0 and retries are disabled
+ CuratorFramework curatorFramework = createCuratorFramework(injector, 0);
+ curatorFramework.start();
curatorFramework.create().inBackground().forPath("/foo");
// org.apache.curator.framework.impl.CuratorFrameworkImpl logs "Background retry gave up" unhandled error twice
+ logger.awaitLogEvents();
List loggingEvents = logger.getLogEvents();
+
Assert.assertTrue(
"Logging events: " + loggingEvents,
loggingEvents.stream()
@@ -121,8 +125,9 @@ public final class CuratorModuleTest
.add(
new LifecycleModule(),
new CuratorModule(false),
+ binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class),
binder -> binder.bind(Properties.class).toInstance(props)
- )
+ )
.build();
}
diff --git a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
new file mode 100644
index 00000000000..b173015959d
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DruidConnectionStateListenerTest
+{
+ private TestEmitter emitter;
+ private DruidConnectionStateListener listener;
+
+ @Before
+ public void setUp()
+ {
+ emitter = new TestEmitter();
+ listener = new DruidConnectionStateListener(emitter);
+ }
+
+ @Test
+ public void test_isConnected()
+ {
+ Assert.assertFalse(listener.isConnected());
+
+ listener.stateChanged(null, ConnectionState.CONNECTED);
+ Assert.assertTrue(listener.isConnected());
+
+ listener.stateChanged(null, ConnectionState.SUSPENDED);
+ Assert.assertFalse(listener.isConnected());
+
+ listener.stateChanged(null, ConnectionState.RECONNECTED);
+ Assert.assertTrue(listener.isConnected());
+
+ listener.stateChanged(null, ConnectionState.LOST);
+ Assert.assertFalse(listener.isConnected());
+ }
+
+ @Test
+ public void test_doMonitor_init()
+ {
+ listener.doMonitor(emitter);
+ Assert.assertEquals(1, emitter.getEvents().size());
+
+ final Map eventMap = emitter.getEvents().get(0).toMap();
+ Assert.assertEquals("zk/connected", eventMap.get("metric"));
+ Assert.assertEquals(0, eventMap.get("value"));
+ }
+
+ @Test
+ public void test_doMonitor_connected()
+ {
+ listener.stateChanged(null, ConnectionState.CONNECTED);
+ listener.doMonitor(emitter);
+ Assert.assertEquals(1, emitter.getEvents().size());
+
+ final Map eventMap = emitter.getEvents().get(0).toMap();
+ Assert.assertEquals("zk/connected", eventMap.get("metric"));
+ Assert.assertEquals(1, eventMap.get("value"));
+ }
+
+ @Test
+ public void test_doMonitor_notConnected()
+ {
+ listener.stateChanged(null, ConnectionState.SUSPENDED);
+ listener.doMonitor(emitter);
+ Assert.assertEquals(2, emitter.getEvents().size()); // 2 because stateChanged emitted an alert
+
+ final Map eventMap = emitter.getEvents().get(1).toMap();
+ Assert.assertEquals("zk/connected", eventMap.get("metric"));
+ Assert.assertEquals(0, eventMap.get("value"));
+ }
+
+ @Test
+ public void test_suspendedAlert()
+ {
+ listener.stateChanged(null, ConnectionState.SUSPENDED);
+ Assert.assertEquals(1, emitter.getEvents().size());
+
+ final Map alertMap = emitter.getEvents().get(0).toMap();
+ Assert.assertEquals("alerts", alertMap.get("feed"));
+ Assert.assertEquals("ZooKeeper connection[SUSPENDED]", alertMap.get("description"));
+ }
+
+ @Test
+ public void test_reconnectedMetric()
+ {
+ listener.stateChanged(null, ConnectionState.SUSPENDED);
+ Assert.assertEquals(1, emitter.getEvents().size()); // the first stateChanged emits an alert
+
+ listener.stateChanged(null, ConnectionState.RECONNECTED);
+ Assert.assertEquals(2, emitter.getEvents().size()); // the second stateChanged emits a metric
+
+ final Map eventMap = emitter.getEvents().get(1).toMap();
+ Assert.assertEquals("metrics", eventMap.get("feed"));
+ Assert.assertEquals("zk/reconnect/time", eventMap.get("metric"));
+ MatcherAssert.assertThat(eventMap.get("value"), CoreMatchers.instanceOf(Long.class));
+ MatcherAssert.assertThat(((Number) eventMap.get("value")).longValue(), Matchers.greaterThanOrEqualTo(0L));
+ }
+
+ private static class TestEmitter extends NoopServiceEmitter
+ {
+ @GuardedBy("events")
+ private final List events = new ArrayList<>();
+
+ @Override
+ public void emit(Event event)
+ {
+ synchronized (events) {
+ events.add(event);
+ }
+ }
+
+ public List getEvents()
+ {
+ synchronized (events) {
+ return ImmutableList.copyOf(events);
+ }
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java
index c28744220f1..8a80b3df3e5 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java
@@ -19,8 +19,8 @@
package org.apache.druid.server.metrics;
-import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.AlertBuilder;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
@@ -28,7 +28,7 @@ import java.util.Map;
public class ExceptionCapturingServiceEmitter extends ServiceEmitter
{
- private volatile Class exceptionClass;
+ private volatile String exceptionClass;
private volatile String exceptionMessage;
private volatile String stackTrace;
@@ -42,10 +42,10 @@ public class ExceptionCapturingServiceEmitter extends ServiceEmitter
{
//noinspection unchecked
final Map dataMap = (Map) event.toMap().get("data");
- final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
+ final String exceptionClass = (String) dataMap.get(AlertBuilder.EXCEPTION_TYPE_KEY);
if (exceptionClass != null) {
- final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
- final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
+ final String exceptionMessage = (String) dataMap.get(AlertBuilder.EXCEPTION_MESSAGE_KEY);
+ final String stackTrace = (String) dataMap.get(AlertBuilder.EXCEPTION_STACK_TRACE_KEY);
this.exceptionClass = exceptionClass;
this.exceptionMessage = exceptionMessage;
this.stackTrace = stackTrace;
@@ -53,7 +53,7 @@ public class ExceptionCapturingServiceEmitter extends ServiceEmitter
}
@Nullable
- public Class getExceptionClass()
+ public String getExceptionClass()
{
return exceptionClass;
}