Merge pull request #2174 from metamx/ingest-size-metrics

Add metrics for ingest/bytes/received for EventReceiverFirehose
This commit is contained in:
Fangjin Yang 2016-01-06 22:05:55 -08:00
commit 3048b1f0a5
6 changed files with 101 additions and 27 deletions

View File

@ -171,7 +171,8 @@ The following metric is only available if the EventReceiverFirehoseMonitor modul
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, bufferCapacity.|Equal to current # of events in the buffer queue.|
|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, dataSource, taskId, bufferCapacity.|Equal to current # of events in the buffer queue.|
|`ingest/bytes/received`|Number of bytes received by the EventReceiverFirehose.|serviceName, dataSource, taskId.|Varies.|
## Sys

View File

@ -31,6 +31,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.CountingInputStream;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -58,6 +59,7 @@ import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Builds firehoses that accept events through the {@link io.druid.segment.realtime.firehose.EventReceiver} interface. Can also register these
@ -137,6 +139,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private volatile InputRow nextRow = null;
private volatile boolean closed = false;
private final AtomicLong bytesReceived = new AtomicLong(0);
public EventReceiverFirehose(MapInputRowParser parser)
{
@ -158,11 +161,11 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
CountingInputStream countingInputStream = new CountingInputStream(in);
Collection<Map<String, Object>> events = null;
try {
events = objectMapper.readValue(
in, new TypeReference<Collection<Map<String, Object>>>()
countingInputStream, new TypeReference<Collection<Map<String, Object>>>()
{
}
);
@ -170,6 +173,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
catch (IOException e) {
return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
}
finally {
bytesReceived.addAndGet(countingInputStream.getCount());
}
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
final List<InputRow> rows = Lists.newArrayList();
@ -256,6 +262,12 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
return bufferSize;
}
@Override
public long getBytesReceived()
{
return bytesReceived.get();
}
@Override
public void close() throws IOException
{

View File

@ -38,4 +38,11 @@ public interface EventReceiverFirehoseMetric
* Return the capacity of the buffer.
*/
int getCapacity();
/**
* Return the number of bytes received by the firehose.
*/
long getBytesReceived();
}

View File

@ -19,24 +19,39 @@
package io.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.KeyedDiff;
import com.metamx.metrics.MonitorUtils;
import io.druid.query.DruidMetrics;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
public class EventReceiverFirehoseMonitor extends AbstractMonitor
{
private final EventReceiverFirehoseRegister register;
private final KeyedDiff keyedDiff = new KeyedDiff();
private final Map<String, String[]> dimensions;
@Inject
public EventReceiverFirehoseMonitor(
EventReceiverFirehoseRegister eventReceiverFirehoseRegister
EventReceiverFirehoseRegister eventReceiverFirehoseRegister,
Properties props
)
{
this.register = eventReceiverFirehoseRegister;
this.dimensions = MonitorsConfig.extractDimensions(
props,
Lists.newArrayList(DruidMetrics.DATASOURCE, DruidMetrics.TASK_ID)
);
}
@Override
@ -46,16 +61,32 @@ public class EventReceiverFirehoseMonitor extends AbstractMonitor
final String serviceName = entry.getKey();
final EventReceiverFirehoseMetric metric = entry.getValue();
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder()
.setDimension("serviceName", serviceName)
.setDimension(
"bufferCapacity",
String.valueOf(metric.getCapacity())
);
final ServiceMetricEvent.Builder builder = createEventBuilder(serviceName)
.setDimension(
"bufferCapacity",
String.valueOf(metric.getCapacity())
);
emitter.emit(builder.build("ingest/events/buffered", metric.getCurrentBufferSize()));
Map<String, Long> diff = keyedDiff.to(
serviceName,
ImmutableMap.of("ingest/bytes/received", metric.getBytesReceived())
);
if (diff != null) {
final ServiceMetricEvent.Builder eventBuilder = createEventBuilder(serviceName);
for (Map.Entry<String, Long> diffEntry : diff.entrySet()) {
emitter.emit(eventBuilder.build(diffEntry.getKey(), diffEntry.getValue()));
}
}
}
return true;
}
private ServiceMetricEvent.Builder createEventBuilder(String serviceName)
{
ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder()
.setDimension("serviceName", serviceName);
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
return builder;
}
}

View File

@ -40,6 +40,7 @@ import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import io.druid.query.DruidMetrics;
import java.util.HashMap;
import java.util.List;
@ -108,35 +109,37 @@ public class MetricsModule implements Module
@ManageLifecycle
public JvmMonitor getJvmMonitor(Properties props)
{
return new JvmMonitor(getDimensions(props));
return new JvmMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
));
}
@Provides
@ManageLifecycle
public JvmCpuMonitor getJvmCpuMonitor(Properties props)
{
return new JvmCpuMonitor(getDimensions(props));
return new JvmCpuMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
));
}
@Provides
@ManageLifecycle
public SysMonitor getSysMonitor(Properties props)
{
return new SysMonitor(getDimensions(props));
return new SysMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
));
}
private Map<String, String[]> getDimensions(Properties props)
{
Map<String, String[]> dimensions = new HashMap<>();
for (String property : props.stringPropertyNames()) {
if (property.startsWith(MonitorsConfig.METRIC_DIMENSION_PREFIX)) {
dimensions.put(
property.substring(MonitorsConfig.METRIC_DIMENSION_PREFIX.length()),
new String[]{props.getProperty(property)}
);
}
}
return dimensions;
}
}

View File

@ -24,7 +24,10 @@ import com.google.common.collect.Lists;
import com.metamx.metrics.Monitor;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*/
@ -48,4 +51,21 @@ public class MonitorsConfig
"monitors=" + monitors +
'}';
}
public static Map<String, String[]> extractDimensions(Properties props, List<String> dimensions)
{
Map<String, String[]> dimensionsMap = new HashMap<>();
for (String property : props.stringPropertyNames()) {
if (property.startsWith(MonitorsConfig.METRIC_DIMENSION_PREFIX)) {
String dimension = property.substring(MonitorsConfig.METRIC_DIMENSION_PREFIX.length());
if (dimensions.contains(dimension)) {
dimensionsMap.put(
dimension,
new String[]{props.getProperty(property)}
);
}
}
}
return dimensionsMap;
}
}