mirror of https://github.com/apache/druid.git
Add ServiceStatusMonitor to monitor service health (#14443)
* Add OverlordStatusMonitor and CoordinatorStatusMonitor to monitor service leader status * make the monitor more general * resolve conflict * use Supplier pattern to provide metrics * reformat code and doc * move service specific tag to dimension * minor refine * update doc * reformat code * address comments * remove declared exception * bind HeartbeatSupplier conditionally in Coordinator
This commit is contained in:
parent
114380749d
commit
b7434be99e
|
@ -399,6 +399,7 @@ Metric monitoring is an essential part of Druid operations. The following monit
|
||||||
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
|
||||||
|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.|
|
|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.|
|
||||||
|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by middleManager node types.|
|
|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by middleManager node types.|
|
||||||
|
|`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat for the service.|
|
||||||
|
|
||||||
For example, you might configure monitors on all processes for system and JVM information within `common.runtime.properties` as follows:
|
For example, you might configure monitors on all processes for system and JVM information within `common.runtime.properties` as follows:
|
||||||
|
|
||||||
|
|
|
@ -329,6 +329,12 @@ If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configurati
|
||||||
|
|
||||||
## General Health
|
## General Health
|
||||||
|
|
||||||
|
### Service Health
|
||||||
|
|
||||||
|
|Metric|Description|Dimensions|Normal Value|
|
||||||
|
|------|-----------|----------|------------|
|
||||||
|
| `service/heartbeat` | Metric indicating the service is up. `ServiceStatusMonitor` must be enabled. |`leader` on the Overlord and Coordinator.|1|
|
||||||
|
|
||||||
### Historical
|
### Historical
|
||||||
|
|
||||||
|Metric|Description|Dimensions|Normal Value|
|
|Metric|Description|Dimensions|Normal Value|
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.metrics;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.name.Named;
|
||||||
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
|
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||||
|
import org.apache.druid.java.util.metrics.AbstractMonitor;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reports a heartbeat for the service.
|
||||||
|
*/
|
||||||
|
public class ServiceStatusMonitor extends AbstractMonitor
|
||||||
|
{
|
||||||
|
|
||||||
|
@Named("heartbeat")
|
||||||
|
@Inject(optional = true)
|
||||||
|
Supplier<Map<String, Object>> heartbeatTagsSupplier = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean doMonitor(ServiceEmitter emitter)
|
||||||
|
{
|
||||||
|
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
|
||||||
|
if (heartbeatTagsSupplier != null && heartbeatTagsSupplier.get() != null) {
|
||||||
|
heartbeatTagsSupplier.get().forEach((k, v) -> {
|
||||||
|
builder.setDimension(k, v);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
emitter.emit(builder.build("service/heartbeat", 1));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,82 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.metrics;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ServiceStatusMonitorTest
|
||||||
|
{
|
||||||
|
|
||||||
|
private ServiceStatusMonitor monitor;
|
||||||
|
private Map<String, Object> heartbeatTags;
|
||||||
|
private Supplier<Map<String, Object>> heartbeatTagsSupplier = () -> heartbeatTags;
|
||||||
|
private static String HEARTBEAT_METRIC_KEY = "service/heartbeat";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
monitor = new ServiceStatusMonitor();
|
||||||
|
heartbeatTags = new HashMap<>();
|
||||||
|
monitor.heartbeatTagsSupplier = heartbeatTagsSupplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultHeartbeatReported()
|
||||||
|
{
|
||||||
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
|
monitor.doMonitor(emitter);
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().size());
|
||||||
|
Assert.assertEquals(HEARTBEAT_METRIC_KEY, emitter.getEvents().get(0).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaderTag()
|
||||||
|
{
|
||||||
|
heartbeatTags.put("leader", 1);
|
||||||
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
|
monitor.doMonitor(emitter);
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().size());
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
|
||||||
|
Assert.assertEquals(HEARTBEAT_METRIC_KEY, emitter.getEvents().get(0).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoreThanOneTag()
|
||||||
|
{
|
||||||
|
heartbeatTags.put("leader", 1);
|
||||||
|
heartbeatTags.put("taskRunner", "http");
|
||||||
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
|
monitor.doMonitor(emitter);
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().size());
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
|
||||||
|
Assert.assertEquals("http", emitter.getEvents().get(0).toMap().get("taskRunner"));
|
||||||
|
Assert.assertEquals(HEARTBEAT_METRIC_KEY, emitter.getEvents().get(0).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.github.rvesse.airline.annotations.Command;
|
import com.github.rvesse.airline.annotations.Command;
|
||||||
import com.google.common.base.Predicates;
|
import com.google.common.base.Predicates;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
@ -31,6 +32,7 @@ import com.google.inject.Key;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.Provider;
|
import com.google.inject.Provider;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
|
import com.google.inject.TypeLiteral;
|
||||||
import com.google.inject.name.Names;
|
import com.google.inject.name.Names;
|
||||||
import com.google.inject.util.Providers;
|
import com.google.inject.util.Providers;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
@ -119,8 +121,10 @@ import org.eclipse.jetty.server.Server;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -330,6 +334,10 @@ public class CliCoordinator extends ServerRunnable
|
||||||
binder.bind(TaskStorage.class).toProvider(Providers.of(null));
|
binder.bind(TaskStorage.class).toProvider(Providers.of(null));
|
||||||
binder.bind(TaskMaster.class).toProvider(Providers.of(null));
|
binder.bind(TaskMaster.class).toProvider(Providers.of(null));
|
||||||
binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
|
binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
|
||||||
|
// Bind HeartbeatSupplier only when the service operates independently of Overlord.
|
||||||
|
binder.bind(new TypeLiteral<Supplier<Map<String, Object>>>() {})
|
||||||
|
.annotatedWith(Names.named("heartbeat"))
|
||||||
|
.toProvider(HeartbeatSupplier.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
binder.bind(CoordinatorCustomDutyGroups.class)
|
binder.bind(CoordinatorCustomDutyGroups.class)
|
||||||
|
@ -459,4 +467,26 @@ public class CliCoordinator extends ServerRunnable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class HeartbeatSupplier implements Provider<Supplier<Map<String, Object>>>
|
||||||
|
{
|
||||||
|
private final DruidCoordinator coordinator;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public HeartbeatSupplier(DruidCoordinator coordinator)
|
||||||
|
{
|
||||||
|
this.coordinator = coordinator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Supplier<Map<String, Object>> get()
|
||||||
|
{
|
||||||
|
return () -> {
|
||||||
|
Map<String, Object> heartbeatTags = new HashMap<>();
|
||||||
|
heartbeatTags.put("leader", coordinator.isLeader() ? 1 : 0);
|
||||||
|
|
||||||
|
return heartbeatTags;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.cli;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.github.rvesse.airline.annotations.Command;
|
import com.github.rvesse.airline.annotations.Command;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
|
@ -32,6 +33,7 @@ import com.google.inject.Provides;
|
||||||
import com.google.inject.TypeLiteral;
|
import com.google.inject.TypeLiteral;
|
||||||
import com.google.inject.multibindings.MapBinder;
|
import com.google.inject.multibindings.MapBinder;
|
||||||
import com.google.inject.multibindings.Multibinder;
|
import com.google.inject.multibindings.Multibinder;
|
||||||
|
import com.google.inject.name.Named;
|
||||||
import com.google.inject.name.Names;
|
import com.google.inject.name.Names;
|
||||||
import com.google.inject.servlet.GuiceFilter;
|
import com.google.inject.servlet.GuiceFilter;
|
||||||
import com.google.inject.util.Providers;
|
import com.google.inject.util.Providers;
|
||||||
|
@ -134,7 +136,9 @@ import org.eclipse.jetty.servlet.FilterHolder;
|
||||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
import org.eclipse.jetty.servlet.ServletHolder;
|
import org.eclipse.jetty.servlet.ServletHolder;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -354,6 +358,19 @@ public class CliOverlord extends ServerRunnable
|
||||||
{
|
{
|
||||||
return TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig);
|
return TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@LazySingleton
|
||||||
|
@Named("heartbeat")
|
||||||
|
public Supplier<Map<String, Object>> getHeartbeatSupplier(TaskMaster taskMaster)
|
||||||
|
{
|
||||||
|
return () -> {
|
||||||
|
Map<String, Object> heartbeatTags = new HashMap<>();
|
||||||
|
heartbeatTags.put("leader", taskMaster.isLeader() ? 1 : 0);
|
||||||
|
|
||||||
|
return heartbeatTags;
|
||||||
|
};
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue