Add metrics for ingest/bytes/received for EventReceiverFirehose

review comments

review comments
This commit is contained in:
Nishant 2015-12-30 22:07:41 +05:30
parent d413808e66
commit 14989f272d
6 changed files with 101 additions and 27 deletions

View File

@ -171,7 +171,8 @@ The following metric is only available if the EventReceiverFirehoseMonitor modul
|Metric|Description|Dimensions|Normal Value| |Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------| |------|-----------|----------|------------|
|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, bufferCapacity.|Equal to current # of events in the buffer queue.| |`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, dataSource, taskId, bufferCapacity.|Equal to current # of events in the buffer queue.|
|`ingest/bytes/received`|Number of bytes received by the EventReceiverFirehose.|serviceName, dataSource, taskId.|Varies.|
## Sys ## Sys

View File

@ -31,6 +31,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.CountingInputStream;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
@ -58,6 +59,7 @@ import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* Builds firehoses that accept events through the {@link io.druid.segment.realtime.firehose.EventReceiver} interface. Can also register these * Builds firehoses that accept events through the {@link io.druid.segment.realtime.firehose.EventReceiver} interface. Can also register these
@ -137,6 +139,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private volatile InputRow nextRow = null; private volatile InputRow nextRow = null;
private volatile boolean closed = false; private volatile boolean closed = false;
private final AtomicLong bytesReceived = new AtomicLong(0);
public EventReceiverFirehose(MapInputRowParser parser) public EventReceiverFirehose(MapInputRowParser parser)
{ {
@ -158,11 +161,11 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
CountingInputStream countingInputStream = new CountingInputStream(in);
Collection<Map<String, Object>> events = null; Collection<Map<String, Object>> events = null;
try { try {
events = objectMapper.readValue( events = objectMapper.readValue(
in, new TypeReference<Collection<Map<String, Object>>>() countingInputStream, new TypeReference<Collection<Map<String, Object>>>()
{ {
} }
); );
@ -170,6 +173,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
catch (IOException e) { catch (IOException e) {
return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build(); return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
} }
finally {
bytesReceived.addAndGet(countingInputStream.getCount());
}
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName); log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
final List<InputRow> rows = Lists.newArrayList(); final List<InputRow> rows = Lists.newArrayList();
@ -256,6 +262,12 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
return bufferSize; return bufferSize;
} }
@Override
public long getBytesReceived()
{
return bytesReceived.get();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -38,4 +38,11 @@ public interface EventReceiverFirehoseMetric
* Return the capacity of the buffer. * Return the capacity of the buffer.
*/ */
int getCapacity(); int getCapacity();
/**
* Return the number of bytes received by the firehose.
*/
long getBytesReceived();
} }

View File

@ -19,24 +19,39 @@
package io.druid.server.metrics; package io.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor; import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.KeyedDiff;
import com.metamx.metrics.MonitorUtils;
import io.druid.query.DruidMetrics;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
public class EventReceiverFirehoseMonitor extends AbstractMonitor public class EventReceiverFirehoseMonitor extends AbstractMonitor
{ {
private final EventReceiverFirehoseRegister register; private final EventReceiverFirehoseRegister register;
private final KeyedDiff keyedDiff = new KeyedDiff();
private final Map<String, String[]> dimensions;
@Inject @Inject
public EventReceiverFirehoseMonitor( public EventReceiverFirehoseMonitor(
EventReceiverFirehoseRegister eventReceiverFirehoseRegister EventReceiverFirehoseRegister eventReceiverFirehoseRegister,
Properties props
) )
{ {
this.register = eventReceiverFirehoseRegister; this.register = eventReceiverFirehoseRegister;
this.dimensions = MonitorsConfig.extractDimensions(
props,
Lists.newArrayList(DruidMetrics.DATASOURCE, DruidMetrics.TASK_ID)
);
} }
@Override @Override
@ -46,16 +61,32 @@ public class EventReceiverFirehoseMonitor extends AbstractMonitor
final String serviceName = entry.getKey(); final String serviceName = entry.getKey();
final EventReceiverFirehoseMetric metric = entry.getValue(); final EventReceiverFirehoseMetric metric = entry.getValue();
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder() final ServiceMetricEvent.Builder builder = createEventBuilder(serviceName)
.setDimension("serviceName", serviceName)
.setDimension( .setDimension(
"bufferCapacity", "bufferCapacity",
String.valueOf(metric.getCapacity()) String.valueOf(metric.getCapacity())
); );
emitter.emit(builder.build("ingest/events/buffered", metric.getCurrentBufferSize())); emitter.emit(builder.build("ingest/events/buffered", metric.getCurrentBufferSize()));
Map<String, Long> diff = keyedDiff.to(
serviceName,
ImmutableMap.of("ingest/bytes/received", metric.getBytesReceived())
);
if (diff != null) {
final ServiceMetricEvent.Builder eventBuilder = createEventBuilder(serviceName);
for (Map.Entry<String, Long> diffEntry : diff.entrySet()) {
emitter.emit(eventBuilder.build(diffEntry.getKey(), diffEntry.getValue()));
}
}
} }
return true; return true;
} }
private ServiceMetricEvent.Builder createEventBuilder(String serviceName)
{
ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder()
.setDimension("serviceName", serviceName);
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
return builder;
}
} }

View File

@ -40,6 +40,7 @@ import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.query.DruidMetrics;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -108,35 +109,37 @@ public class MetricsModule implements Module
@ManageLifecycle @ManageLifecycle
public JvmMonitor getJvmMonitor(Properties props) public JvmMonitor getJvmMonitor(Properties props)
{ {
return new JvmMonitor(getDimensions(props)); return new JvmMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
));
} }
@Provides @Provides
@ManageLifecycle @ManageLifecycle
public JvmCpuMonitor getJvmCpuMonitor(Properties props) public JvmCpuMonitor getJvmCpuMonitor(Properties props)
{ {
return new JvmCpuMonitor(getDimensions(props)); return new JvmCpuMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
));
} }
@Provides @Provides
@ManageLifecycle @ManageLifecycle
public SysMonitor getSysMonitor(Properties props) public SysMonitor getSysMonitor(Properties props)
{ {
return new SysMonitor(getDimensions(props)); return new SysMonitor(MonitorsConfig.extractDimensions(props,
Lists.newArrayList(
DruidMetrics.DATASOURCE,
DruidMetrics.TASK_ID
)
));
} }
private Map<String, String[]> getDimensions(Properties props)
{
Map<String, String[]> dimensions = new HashMap<>();
for (String property : props.stringPropertyNames()) {
if (property.startsWith(MonitorsConfig.METRIC_DIMENSION_PREFIX)) {
dimensions.put(
property.substring(MonitorsConfig.METRIC_DIMENSION_PREFIX.length()),
new String[]{props.getProperty(property)}
);
}
}
return dimensions;
}
} }

View File

@ -24,7 +24,10 @@ import com.google.common.collect.Lists;
import com.metamx.metrics.Monitor; import com.metamx.metrics.Monitor;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties;
/** /**
*/ */
@ -48,4 +51,21 @@ public class MonitorsConfig
"monitors=" + monitors + "monitors=" + monitors +
'}'; '}';
} }
public static Map<String, String[]> extractDimensions(Properties props, List<String> dimensions)
{
Map<String, String[]> dimensionsMap = new HashMap<>();
for (String property : props.stringPropertyNames()) {
if (property.startsWith(MonitorsConfig.METRIC_DIMENSION_PREFIX)) {
String dimension = property.substring(MonitorsConfig.METRIC_DIMENSION_PREFIX.length());
if (dimensions.contains(dimension)) {
dimensionsMap.put(
dimension,
new String[]{props.getProperty(property)}
);
}
}
}
return dimensionsMap;
}
} }