diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 89070a07701..253e785f629 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -117,6 +117,7 @@ The following monitors are available: |`io.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical nodes.| |`com.metamx.metrics.JvmMonitor`|Reports JVM-related statistics.| |`io.druid.segment.realtime.RealtimeMetricsMonitor`|Reports statistics on Realtime nodes.| +|`io.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.| ### Emitting Metrics diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index b8317ad78b3..c87e3b788f2 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -161,6 +161,14 @@ These metrics are only available if the JVMMonitor module is included. |`jvm/gc/count`|Garbage collection count.|gcName.|< 100| |`jvm/gc/time`|Garbage collection time.|gcName.|< 1s| +### EventReceiverFirehose + +The following metric is only available if the EventReceiverFirehoseMonitor module is included. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, bufferCapacity.|Equal to current # of events in the buffer queue.| + ## Sys These metrics are only available if the SysMonitor module is included. diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index cb6c947dd03..c58147f0501 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -33,12 +33,12 @@ import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; -import io.druid.data.input.Rows; import io.druid.data.input.impl.MapInputRowParser; - import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; -import java.io.InputStream; +import io.druid.server.metrics.EventReceiverFirehoseMetric; +import io.druid.server.metrics.EventReceiverFirehoseRegister; + import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.POST; @@ -48,6 +48,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.List; import java.util.Map; @@ -70,6 +71,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory chatHandlerProvider; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; + private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister; @JsonCreator public EventReceiverFirehoseFactory( @@ -77,7 +79,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory buffer; private final MapInputRowParser parser; @@ -243,12 +248,25 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory entry : register.getMetrics()) { + final String serviceName = entry.getKey(); + final EventReceiverFirehoseMetric metric = entry.getValue(); + + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder() + .setDimension("serviceName", serviceName) + .setDimension( + "bufferCapacity", + String.valueOf(metric.getCapacity()) + ); + + emitter.emit(builder.build("ingest/events/buffered", metric.getCurrentBufferSize())); + } + + return true; + } +} diff --git a/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseRegister.java b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseRegister.java new file mode 100644 index 00000000000..da1f073e461 --- /dev/null +++ b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseRegister.java @@ -0,0 +1,56 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.metrics; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class EventReceiverFirehoseRegister +{ + + private static final Logger log = new Logger(EventReceiverFirehoseRegister.class); + + private final ConcurrentMap metrics = new ConcurrentHashMap<>(); + + public void register(String serviceName, EventReceiverFirehoseMetric metric) + { + log.info("Registering EventReceiverFirehoseMetric for service [%s]", serviceName); + if (metrics.putIfAbsent(serviceName, metric) != null) { + throw new ISE("Service [%s] is already registered!", serviceName); + } + } + + public Iterable> getMetrics() + { + return metrics.entrySet(); + } + + public void unregister(String serviceName) + { + log.info("Unregistering EventReceiverFirehoseMetric for service [%s]", serviceName); + if (metrics.remove(serviceName) == null) { + log.warn("Unregistering a non-exist service. Service [%s] never exists."); + } + } +} diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index a395b8b317e..2d80a760d2e 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -33,6 +33,7 @@ import com.metamx.metrics.MonitorScheduler; import io.druid.concurrent.Execs; import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; import java.util.List; @@ -59,6 +60,8 @@ public class MetricsModule implements Module DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum. + binder.bind(EventReceiverFirehoseRegister.class).in(LazySingleton.class); + // Instantiate eagerly so that we get everything registered and put into the Lifecycle binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness"))) .to(MonitorScheduler.class) diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java new file mode 100644 index 00000000000..9df0170f70c --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.metamx.common.ISE; +import io.druid.concurrent.Execs; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.metrics.EventReceiverFirehoseMetric; +import io.druid.server.metrics.EventReceiverFirehoseRegister; +import org.apache.commons.io.IOUtils; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class EventReceiverFirehoseTest +{ + private static final int CAPACITY = 300; + private static final int NUM_EVENTS = 100; + private static final String SERVICE_NAME = "test_firehose"; + + private final String inputRow = "[{\n" + + " \"timestamp\":123,\n" + + " \"d1\":\"v1\"\n" + + "}]"; + + private EventReceiverFirehoseFactory eventReceiverFirehoseFactory; + private EventReceiverFirehoseFactory.EventReceiverFirehose firehose; + private EventReceiverFirehoseRegister register = new EventReceiverFirehoseRegister(); + private HttpServletRequest req; + + @Before + public void setUp() throws Exception + { + req = EasyMock.createMock(HttpServletRequest.class); + eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory( + SERVICE_NAME, + CAPACITY, + null, + new DefaultObjectMapper(), + new DefaultObjectMapper(), + register + ); + firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory.connect( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), new DimensionsSpec(ImmutableList.of("d1"), null, null) + ) + ) + ); + } + + @Test + public void testSingleThread() throws IOException + { + EasyMock.expect(req.getContentType()).andReturn("application/json").times(NUM_EVENTS); + EasyMock.replay(req); + + for (int i = 0; i < NUM_EVENTS; ++i) { + final InputStream inputStream = IOUtils.toInputStream(inputRow); + firehose.addAll(inputStream, req); + Assert.assertEquals(i + 1, firehose.getCurrentBufferSize()); + inputStream.close(); + } + + EasyMock.verify(req); + + final Iterable> metrics = register.getMetrics(); + Assert.assertEquals(1, Iterables.size(metrics)); + + final Map.Entry entry = Iterables.getLast(metrics); + Assert.assertEquals(SERVICE_NAME, entry.getKey()); + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(NUM_EVENTS, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(NUM_EVENTS, firehose.getCurrentBufferSize()); + + for (int i = NUM_EVENTS - 1; i >= 0; --i) { + Assert.assertTrue(firehose.hasMore()); + Assert.assertNotNull(firehose.nextRow()); + Assert.assertEquals(i, firehose.getCurrentBufferSize()); + } + + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(0, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(0, firehose.getCurrentBufferSize()); + + firehose.close(); + Assert.assertFalse(firehose.hasMore()); + Assert.assertEquals(0, Iterables.size(register.getMetrics())); + + } + + @Test + public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException + { + EasyMock.expect(req.getContentType()).andReturn("application/json").times(2 * NUM_EVENTS); + EasyMock.replay(req); + + final ExecutorService executorService = Execs.singleThreaded("single_thread"); + final Future future = executorService.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + for (int i = 0; i < NUM_EVENTS; ++i) { + final InputStream inputStream = IOUtils.toInputStream(inputRow); + firehose.addAll(inputStream, req); + inputStream.close(); + } + return true; + } + } + ); + + for (int i = 0; i < NUM_EVENTS; ++i) { + final InputStream inputStream = IOUtils.toInputStream(inputRow); + firehose.addAll(inputStream, req); + inputStream.close(); + } + + future.get(10, TimeUnit.SECONDS); + + EasyMock.verify(req); + + final Iterable> metrics = register.getMetrics(); + Assert.assertEquals(1, Iterables.size(metrics)); + + final Map.Entry entry = Iterables.getLast(metrics); + + Assert.assertEquals(SERVICE_NAME, entry.getKey()); + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(2 * NUM_EVENTS, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(2 * NUM_EVENTS, firehose.getCurrentBufferSize()); + + for (int i = 2 * NUM_EVENTS - 1; i >= 0; --i) { + Assert.assertTrue(firehose.hasMore()); + Assert.assertNotNull(firehose.nextRow()); + Assert.assertEquals(i, firehose.getCurrentBufferSize()); + } + + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(0, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(0, firehose.getCurrentBufferSize()); + + firehose.close(); + Assert.assertFalse(firehose.hasMore()); + Assert.assertEquals(0, Iterables.size(register.getMetrics())); + + executorService.shutdownNow(); + } + + @Test(expected = ISE.class) + public void testDuplicateRegistering() throws IOException + { + EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory( + SERVICE_NAME, + CAPACITY, + null, + new DefaultObjectMapper(), + new DefaultObjectMapper(), + register + ); + EventReceiverFirehoseFactory.EventReceiverFirehose firehose2 = + (EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory2 + .connect( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), new DimensionsSpec(ImmutableList.of("d1"), null, null) + ) + ) + ); + } +}