mirror of https://github.com/apache/druid.git
Add ZooKeeper connection state alerts and metrics. (#14333)
* Add ZooKeeper connection state alerts and metrics. - New metric "zk/connected" is an indicator showing 1 when connected, 0 when disconnected. - New metric "zk/disconnected/time" measures time spent disconnected. - New alert when Curator connection state enters LOST or SUSPENDED. * Use right GuardedBy. * Test fixes, coverage. * Adjustment. * Fix tests. * Fix ITs. * Improved injection. * Adjust metric name, add tests.
This commit is contained in:
parent
3711c0d987
commit
3ff51487b7
|
@ -369,6 +369,15 @@ For more information, see [Enabling Metrics](../configuration/index.md#enabling-
|
|||
|`jvm/gc/count`|Garbage collection count|`gcName` (cms/g1/parallel/etc.), `gcGen` (old/young)|Varies|
|
||||
|`jvm/gc/cpu`|Count of CPU time in Nanoseconds spent on garbage collection. Note: `jvm/gc/cpu` represents the total time over multiple GC cycles; divide by `jvm/gc/count` to get the mean GC time per cycle.|`gcName`, `gcGen`|Sum of `jvm/gc/cpu` should be within 10-30% of sum of `jvm/cpu/total`, depending on the GC algorithm used (reported by [`JvmCpuMonitor`](../configuration/index.md#enabling-metrics)). |
|
||||
|
||||
### ZooKeeper
|
||||
|
||||
These metrics are available unless `druid.zk.service.enabled = false`.
|
||||
|
||||
|Metric|Description|Dimensions|Normal Value|
|
||||
|------|-----------|----------|------------|
|
||||
|`zk/connected`|Indicator of connection status. `1` for connected, `0` for disconnected. Emitted once per monitor period.|None|1|
|
||||
|`zk/reconnect/time`|Amount of time, in milliseconds, that a server was disconnected from ZooKeeper before reconnecting. Emitted on reconnection. Not emitted if connection to ZooKeeper is permanently lost, because in this case, there is no reconnection.|None|Not present|
|
||||
|
||||
### EventReceiverFirehose
|
||||
|
||||
The following metric is only available if the `EventReceiverFirehoseMonitor` module is included.
|
||||
|
|
|
@ -3440,7 +3440,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
|
||||
serviceEmitter.getExceptionMessage()
|
||||
);
|
||||
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
|
||||
Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -3461,7 +3461,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
|
||||
serviceEmitter.getExceptionMessage()
|
||||
);
|
||||
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
|
||||
Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -168,7 +168,11 @@ public class Initializer
|
|||
JsonConfigProvider.bind(binder, MetadataStorageTablesConfig.PROPERTY_BASE, MetadataStorageTablesConfig.class);
|
||||
|
||||
// Build from properties provided in the config
|
||||
JsonConfigProvider.bind(binder, MetadataStorageConnectorConfig.PROPERTY_BASE, MetadataStorageConnectorConfig.class);
|
||||
JsonConfigProvider.bind(
|
||||
binder,
|
||||
MetadataStorageConnectorConfig.PROPERTY_BASE,
|
||||
MetadataStorageConnectorConfig.class
|
||||
);
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
|
|
@ -29,17 +29,11 @@ import org.apache.druid.java.util.emitter.service.AlertBuilder;
|
|||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class EmittingLogger extends Logger
|
||||
{
|
||||
public static final String EXCEPTION_TYPE_KEY = "exceptionType";
|
||||
public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
|
||||
public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
|
||||
|
||||
private static volatile ServiceEmitter emitter = null;
|
||||
|
||||
private final String className;
|
||||
|
@ -93,19 +87,8 @@ public class EmittingLogger extends Logger
|
|||
throw e;
|
||||
}
|
||||
|
||||
final AlertBuilder retVal = new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter)
|
||||
return new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter)
|
||||
.addData("class", className);
|
||||
|
||||
if (t != null) {
|
||||
final StringWriter trace = new StringWriter();
|
||||
final PrintWriter pw = new PrintWriter(trace);
|
||||
t.printStackTrace(pw);
|
||||
retVal.addData("exceptionType", t.getClass());
|
||||
retVal.addData("exceptionMessage", t.getMessage());
|
||||
retVal.addData("exceptionStackTrace", trace.toString());
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
public class EmittingAlertBuilder extends AlertBuilder
|
||||
|
@ -118,6 +101,7 @@ public class EmittingLogger extends Logger
|
|||
{
|
||||
super(description, emitter);
|
||||
this.t = t;
|
||||
addThrowable(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,12 +24,20 @@ import com.google.common.collect.Maps;
|
|||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class AlertBuilder extends ServiceEventBuilder<AlertEvent>
|
||||
{
|
||||
public static final String EXCEPTION_TYPE_KEY = "exceptionType";
|
||||
public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
|
||||
public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
|
||||
|
||||
protected final Map<String, Object> dataMap = Maps.newLinkedHashMap();
|
||||
protected final String description;
|
||||
protected final ServiceEmitter emitter;
|
||||
|
@ -67,6 +75,20 @@ public class AlertBuilder extends ServiceEventBuilder<AlertEvent>
|
|||
return this;
|
||||
}
|
||||
|
||||
public AlertBuilder addThrowable(@Nullable final Throwable t)
|
||||
{
|
||||
if (t != null) {
|
||||
final StringWriter trace = new StringWriter();
|
||||
final PrintWriter pw = new PrintWriter(trace);
|
||||
t.printStackTrace(pw);
|
||||
addData(EXCEPTION_TYPE_KEY, t.getClass().getName());
|
||||
addData(EXCEPTION_MESSAGE_KEY, t.getMessage());
|
||||
addData(EXCEPTION_STACK_TRACE_KEY, trace.toString());
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public AlertBuilder severity(AlertEvent.Severity severity)
|
||||
{
|
||||
this.severity = severity;
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.java.util.emitter.service;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.emitter.core.EventMap;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class AlertBuilderTest
|
||||
{
|
||||
@Test
|
||||
public void testAlertBuilder()
|
||||
{
|
||||
final AlertEvent alertEvent =
|
||||
AlertBuilder.create("alert[%s]", "oops")
|
||||
.addData(ImmutableMap.of("foo", "bar"))
|
||||
.addData(ImmutableMap.of("baz", "qux"))
|
||||
.addThrowable(new RuntimeException("an exception!"))
|
||||
.build("druid/test", "example.com");
|
||||
|
||||
final EventMap alertMap = alertEvent.toMap();
|
||||
|
||||
Assert.assertEquals("alerts", alertMap.get("feed"));
|
||||
Assert.assertEquals("alert[oops]", alertMap.get("description"));
|
||||
Assert.assertEquals("druid/test", alertMap.get("service"));
|
||||
Assert.assertEquals("example.com", alertMap.get("host"));
|
||||
|
||||
final Map<String, Object> dataMap = (Map<String, Object>) alertMap.get("data");
|
||||
Assert.assertEquals("java.lang.RuntimeException", dataMap.get("exceptionType"));
|
||||
Assert.assertEquals("an exception!", dataMap.get("exceptionMessage"));
|
||||
Assert.assertEquals("bar", dataMap.get("foo"));
|
||||
Assert.assertEquals("qux", dataMap.get("baz"));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.druid.testing.junit;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
|
@ -28,8 +30,8 @@ import org.apache.logging.log4j.core.config.Configuration;
|
|||
import org.apache.logging.log4j.core.config.LoggerConfig;
|
||||
import org.junit.rules.ExternalResource;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
* JUnit rule to capture a class's logger output to an in-memory buffer to allow verification of log messages in tests.
|
||||
|
@ -73,6 +75,14 @@ public class LoggerCaptureRule extends ExternalResource
|
|||
inMemoryAppender.clearLogEvents();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the captured
|
||||
*/
|
||||
public void awaitLogEvents() throws InterruptedException
|
||||
{
|
||||
inMemoryAppender.awaitLogEvents();
|
||||
}
|
||||
|
||||
private static class InMemoryAppender extends AbstractAppender
|
||||
{
|
||||
static final String NAME = InMemoryAppender.class.getName();
|
||||
|
@ -80,32 +90,51 @@ public class LoggerCaptureRule extends ExternalResource
|
|||
private final String targetLoggerName;
|
||||
|
||||
// logEvents has concurrent iteration and modification in CuratorModuleTest::exitsJvmWhenMaxRetriesExceeded(), needs to be thread safe
|
||||
private final CopyOnWriteArrayList<LogEvent> logEvents;
|
||||
@GuardedBy("logEvents")
|
||||
private final List<LogEvent> logEvents;
|
||||
|
||||
InMemoryAppender(Class<?> targetClass)
|
||||
{
|
||||
super(NAME, null, null);
|
||||
targetLoggerName = targetClass.getName();
|
||||
logEvents = new CopyOnWriteArrayList<>();
|
||||
logEvents = new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(LogEvent logEvent)
|
||||
{
|
||||
synchronized (logEvents) {
|
||||
if (logEvent.getLoggerName().equals(targetLoggerName)) {
|
||||
logEvents.add(logEvent);
|
||||
logEvents.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<LogEvent> getLogEvents()
|
||||
{
|
||||
return logEvents;
|
||||
synchronized (logEvents) {
|
||||
return ImmutableList.copyOf(logEvents);
|
||||
}
|
||||
}
|
||||
|
||||
void clearLogEvents()
|
||||
{
|
||||
synchronized (logEvents) {
|
||||
logEvents.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for "logEvents" to be nonempty. If it is already nonempty, return immediately.
|
||||
*/
|
||||
void awaitLogEvents() throws InterruptedException
|
||||
{
|
||||
synchronized (logEvents) {
|
||||
while (logEvents.isEmpty()) {
|
||||
logEvents.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,9 @@ import org.apache.druid.guice.LazySingleton;
|
|||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.service.AlertBuilder;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.server.metrics.MetricsModule;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
|
||||
|
@ -58,7 +61,6 @@ public class CuratorModule implements Module
|
|||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param haltOnFailedStart set to true if the JVM needs to be halted within 30 seconds of failed initialization
|
||||
* due to unhandled curator exceptions.
|
||||
*/
|
||||
|
@ -72,6 +74,7 @@ public class CuratorModule implements Module
|
|||
{
|
||||
JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, ZkEnablementConfig.class);
|
||||
JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, CuratorConfig.class);
|
||||
MetricsModule.register(binder, DruidConnectionStateListener.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -88,7 +91,8 @@ public class CuratorModule implements Module
|
|||
);
|
||||
}
|
||||
|
||||
RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries());
|
||||
final RetryPolicy retryPolicy =
|
||||
new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries());
|
||||
|
||||
return builder
|
||||
.ensembleProvider(new FixedEnsembleProvider(config.getZkHosts()))
|
||||
|
@ -105,7 +109,13 @@ public class CuratorModule implements Module
|
|||
*/
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, Lifecycle lifecycle)
|
||||
public CuratorFramework makeCurator(
|
||||
final ZkEnablementConfig zkEnablementConfig,
|
||||
final CuratorConfig config,
|
||||
final DruidConnectionStateListener connectionStateListener,
|
||||
final ServiceEmitter emitter,
|
||||
final Lifecycle lifecycle
|
||||
)
|
||||
{
|
||||
if (!zkEnablementConfig.isEnabled()) {
|
||||
throw new RuntimeException("Zookeeper is disabled, cannot create CuratorFramework.");
|
||||
|
@ -113,7 +123,34 @@ public class CuratorModule implements Module
|
|||
|
||||
final CuratorFramework framework = createCurator(config);
|
||||
|
||||
framework.getConnectionStateListenable().addListener(connectionStateListener);
|
||||
addUnhandledErrorListener(framework, emitter, lifecycle);
|
||||
addLifecycleHandler(framework, lifecycle);
|
||||
|
||||
return framework;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide an instance of {@link DruidConnectionStateListener} for monitoring connection state.
|
||||
*/
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public DruidConnectionStateListener makeConnectionStateListener(final ServiceEmitter emitter)
|
||||
{
|
||||
return new DruidConnectionStateListener(emitter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add unhandled error listener that shuts down the JVM.
|
||||
*/
|
||||
private void addUnhandledErrorListener(
|
||||
final CuratorFramework framework,
|
||||
final ServiceEmitter emitter,
|
||||
final Lifecycle lifecycle
|
||||
)
|
||||
{
|
||||
framework.getUnhandledErrorListenable().addListener((message, e) -> {
|
||||
emitter.emit(AlertBuilder.create("Unhandled Curator error").addThrowable(e));
|
||||
log.error(e, "Unhandled error in Curator, stopping server.");
|
||||
|
||||
if (haltOnFailedStart) {
|
||||
|
@ -140,7 +177,13 @@ public class CuratorModule implements Module
|
|||
|
||||
shutdown(lifecycle);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Add unhandled error listener that shuts down the JVM.
|
||||
*/
|
||||
private void addLifecycleHandler(final CuratorFramework framework, final Lifecycle lifecycle)
|
||||
{
|
||||
lifecycle.addHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
|
@ -159,23 +202,6 @@ public class CuratorModule implements Module
|
|||
}
|
||||
}
|
||||
);
|
||||
|
||||
return framework;
|
||||
}
|
||||
|
||||
static class SecuredACLProvider implements ACLProvider
|
||||
{
|
||||
@Override
|
||||
public List<ACL> getDefaultAcl()
|
||||
{
|
||||
return ZooDefs.Ids.CREATOR_ALL_ACL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ACL> getAclForPath(String path)
|
||||
{
|
||||
return ZooDefs.Ids.CREATOR_ALL_ACL;
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdown(Lifecycle lifecycle)
|
||||
|
@ -191,4 +217,19 @@ public class CuratorModule implements Module
|
|||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
private static class SecuredACLProvider implements ACLProvider
|
||||
{
|
||||
@Override
|
||||
public List<ACL> getDefaultAcl()
|
||||
{
|
||||
return ZooDefs.Ids.CREATOR_ALL_ACL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ACL> getAclForPath(String path)
|
||||
{
|
||||
return ZooDefs.Ids.CREATOR_ALL_ACL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.curator;
|
||||
|
||||
import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.state.ConnectionState;
|
||||
import org.apache.curator.framework.state.ConnectionStateListener;
|
||||
import org.apache.druid.java.util.emitter.service.AlertBuilder;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.druid.java.util.metrics.AbstractMonitor;
|
||||
|
||||
/**
|
||||
* Curator {@link ConnectionStateListener} that uses a {@link ServiceEmitter} to send alerts on ZK connection loss,
|
||||
* and emit metrics about ZK connection status.
|
||||
*/
|
||||
public class DruidConnectionStateListener extends AbstractMonitor implements ConnectionStateListener
|
||||
{
|
||||
private static final String METRIC_IS_CONNECTED = "zk/connected";
|
||||
private static final String METRIC_RECONNECT_TIME = "zk/reconnect/time";
|
||||
private static final int NIL = -1;
|
||||
|
||||
private final ServiceEmitter emitter;
|
||||
|
||||
/**
|
||||
* Current connection state.
|
||||
*/
|
||||
@GuardedBy("this")
|
||||
private ConnectionState currentState;
|
||||
|
||||
/**
|
||||
* Time given by {@link System#currentTimeMillis()} at last disconnect.
|
||||
*/
|
||||
@GuardedBy("this")
|
||||
private long lastDisconnectTime = NIL;
|
||||
|
||||
public DruidConnectionStateListener(final ServiceEmitter emitter)
|
||||
{
|
||||
this.emitter = emitter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState)
|
||||
{
|
||||
if (newState.isConnected()) {
|
||||
final long disconnectDuration;
|
||||
|
||||
synchronized (this) {
|
||||
if (lastDisconnectTime != NIL) {
|
||||
disconnectDuration = Math.max(0, System.currentTimeMillis() - lastDisconnectTime);
|
||||
} else {
|
||||
disconnectDuration = NIL;
|
||||
}
|
||||
|
||||
currentState = newState;
|
||||
lastDisconnectTime = NIL;
|
||||
}
|
||||
|
||||
if (disconnectDuration != NIL) {
|
||||
emitter.emit(ServiceMetricEvent.builder().build(METRIC_RECONNECT_TIME, disconnectDuration));
|
||||
}
|
||||
} else {
|
||||
synchronized (this) {
|
||||
currentState = newState;
|
||||
lastDisconnectTime = Math.max(lastDisconnectTime, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
emitter.emit(AlertBuilder.create("ZooKeeper connection[%s]", newState));
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isConnected()
|
||||
{
|
||||
synchronized (this) {
|
||||
return currentState != null && currentState.isConnected();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doMonitor(ServiceEmitter emitter)
|
||||
{
|
||||
emitter.emit(ServiceMetricEvent.builder().build(METRIC_IS_CONNECTED, isConnected() ? 1 : 0));
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -27,6 +27,8 @@ import org.apache.curator.retry.BoundedExponentialBackoffRetry;
|
|||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.apache.druid.guice.LifecycleModule;
|
||||
import org.apache.druid.guice.StartupInjectorBuilder;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.apache.druid.testing.junit.LoggerCaptureRule;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
|
@ -69,23 +71,25 @@ public final class CuratorModuleTest
|
|||
Assert.assertEquals(CuratorModule.MAX_SLEEP_TIME_MS, retryPolicy.getMaxSleepTimeMs());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 60_000L)
|
||||
public void exitsJvmWhenMaxRetriesExceeded() throws Exception
|
||||
{
|
||||
Properties props = new Properties();
|
||||
props.setProperty(CURATOR_CONNECTION_TIMEOUT_MS_KEY, "0");
|
||||
Injector injector = newInjector(props);
|
||||
CuratorFramework curatorFramework = createCuratorFramework(injector, 0);
|
||||
curatorFramework.start();
|
||||
|
||||
exit.expectSystemExitWithStatus(1);
|
||||
logger.clearLogEvents();
|
||||
exit.expectSystemExitWithStatus(1);
|
||||
|
||||
// This will result in a curator unhandled error since the connection timeout is 0 and retries are disabled
|
||||
CuratorFramework curatorFramework = createCuratorFramework(injector, 0);
|
||||
curatorFramework.start();
|
||||
curatorFramework.create().inBackground().forPath("/foo");
|
||||
|
||||
// org.apache.curator.framework.impl.CuratorFrameworkImpl logs "Background retry gave up" unhandled error twice
|
||||
logger.awaitLogEvents();
|
||||
List<LogEvent> loggingEvents = logger.getLogEvents();
|
||||
|
||||
Assert.assertTrue(
|
||||
"Logging events: " + loggingEvents,
|
||||
loggingEvents.stream()
|
||||
|
@ -121,6 +125,7 @@ public final class CuratorModuleTest
|
|||
.add(
|
||||
new LifecycleModule(),
|
||||
new CuratorModule(false),
|
||||
binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class),
|
||||
binder -> binder.bind(Properties.class).toInstance(props)
|
||||
)
|
||||
.build();
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.curator;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||
import org.apache.curator.framework.state.ConnectionState;
|
||||
import org.apache.druid.java.util.emitter.core.Event;
|
||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DruidConnectionStateListenerTest
|
||||
{
|
||||
private TestEmitter emitter;
|
||||
private DruidConnectionStateListener listener;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
emitter = new TestEmitter();
|
||||
listener = new DruidConnectionStateListener(emitter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_isConnected()
|
||||
{
|
||||
Assert.assertFalse(listener.isConnected());
|
||||
|
||||
listener.stateChanged(null, ConnectionState.CONNECTED);
|
||||
Assert.assertTrue(listener.isConnected());
|
||||
|
||||
listener.stateChanged(null, ConnectionState.SUSPENDED);
|
||||
Assert.assertFalse(listener.isConnected());
|
||||
|
||||
listener.stateChanged(null, ConnectionState.RECONNECTED);
|
||||
Assert.assertTrue(listener.isConnected());
|
||||
|
||||
listener.stateChanged(null, ConnectionState.LOST);
|
||||
Assert.assertFalse(listener.isConnected());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_doMonitor_init()
|
||||
{
|
||||
listener.doMonitor(emitter);
|
||||
Assert.assertEquals(1, emitter.getEvents().size());
|
||||
|
||||
final Map<String, Object> eventMap = emitter.getEvents().get(0).toMap();
|
||||
Assert.assertEquals("zk/connected", eventMap.get("metric"));
|
||||
Assert.assertEquals(0, eventMap.get("value"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_doMonitor_connected()
|
||||
{
|
||||
listener.stateChanged(null, ConnectionState.CONNECTED);
|
||||
listener.doMonitor(emitter);
|
||||
Assert.assertEquals(1, emitter.getEvents().size());
|
||||
|
||||
final Map<String, Object> eventMap = emitter.getEvents().get(0).toMap();
|
||||
Assert.assertEquals("zk/connected", eventMap.get("metric"));
|
||||
Assert.assertEquals(1, eventMap.get("value"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_doMonitor_notConnected()
|
||||
{
|
||||
listener.stateChanged(null, ConnectionState.SUSPENDED);
|
||||
listener.doMonitor(emitter);
|
||||
Assert.assertEquals(2, emitter.getEvents().size()); // 2 because stateChanged emitted an alert
|
||||
|
||||
final Map<String, Object> eventMap = emitter.getEvents().get(1).toMap();
|
||||
Assert.assertEquals("zk/connected", eventMap.get("metric"));
|
||||
Assert.assertEquals(0, eventMap.get("value"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_suspendedAlert()
|
||||
{
|
||||
listener.stateChanged(null, ConnectionState.SUSPENDED);
|
||||
Assert.assertEquals(1, emitter.getEvents().size());
|
||||
|
||||
final Map<String, Object> alertMap = emitter.getEvents().get(0).toMap();
|
||||
Assert.assertEquals("alerts", alertMap.get("feed"));
|
||||
Assert.assertEquals("ZooKeeper connection[SUSPENDED]", alertMap.get("description"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_reconnectedMetric()
|
||||
{
|
||||
listener.stateChanged(null, ConnectionState.SUSPENDED);
|
||||
Assert.assertEquals(1, emitter.getEvents().size()); // the first stateChanged emits an alert
|
||||
|
||||
listener.stateChanged(null, ConnectionState.RECONNECTED);
|
||||
Assert.assertEquals(2, emitter.getEvents().size()); // the second stateChanged emits a metric
|
||||
|
||||
final Map<String, Object> eventMap = emitter.getEvents().get(1).toMap();
|
||||
Assert.assertEquals("metrics", eventMap.get("feed"));
|
||||
Assert.assertEquals("zk/reconnect/time", eventMap.get("metric"));
|
||||
MatcherAssert.assertThat(eventMap.get("value"), CoreMatchers.instanceOf(Long.class));
|
||||
MatcherAssert.assertThat(((Number) eventMap.get("value")).longValue(), Matchers.greaterThanOrEqualTo(0L));
|
||||
}
|
||||
|
||||
private static class TestEmitter extends NoopServiceEmitter
|
||||
{
|
||||
@GuardedBy("events")
|
||||
private final List<Event> events = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void emit(Event event)
|
||||
{
|
||||
synchronized (events) {
|
||||
events.add(event);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Event> getEvents()
|
||||
{
|
||||
synchronized (events) {
|
||||
return ImmutableList.copyOf(events);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.apache.druid.server.metrics;
|
||||
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.core.Event;
|
||||
import org.apache.druid.java.util.emitter.service.AlertBuilder;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -28,7 +28,7 @@ import java.util.Map;
|
|||
|
||||
public class ExceptionCapturingServiceEmitter extends ServiceEmitter
|
||||
{
|
||||
private volatile Class exceptionClass;
|
||||
private volatile String exceptionClass;
|
||||
private volatile String exceptionMessage;
|
||||
private volatile String stackTrace;
|
||||
|
||||
|
@ -42,10 +42,10 @@ public class ExceptionCapturingServiceEmitter extends ServiceEmitter
|
|||
{
|
||||
//noinspection unchecked
|
||||
final Map<String, Object> dataMap = (Map<String, Object>) event.toMap().get("data");
|
||||
final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
|
||||
final String exceptionClass = (String) dataMap.get(AlertBuilder.EXCEPTION_TYPE_KEY);
|
||||
if (exceptionClass != null) {
|
||||
final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
|
||||
final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
|
||||
final String exceptionMessage = (String) dataMap.get(AlertBuilder.EXCEPTION_MESSAGE_KEY);
|
||||
final String stackTrace = (String) dataMap.get(AlertBuilder.EXCEPTION_STACK_TRACE_KEY);
|
||||
this.exceptionClass = exceptionClass;
|
||||
this.exceptionMessage = exceptionMessage;
|
||||
this.stackTrace = stackTrace;
|
||||
|
@ -53,7 +53,7 @@ public class ExceptionCapturingServiceEmitter extends ServiceEmitter
|
|||
}
|
||||
|
||||
@Nullable
|
||||
public Class getExceptionClass()
|
||||
public String getExceptionClass()
|
||||
{
|
||||
return exceptionClass;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue