mirror of https://github.com/apache/druid.git
Merge pull request #1791 from guobingkun/event_receiver_firehose_monitor
EventReceiverFirehoseMonitor
This commit is contained in:
commit
cf779946ef
|
@ -117,6 +117,7 @@ The following monitors are available:
|
||||||
|`io.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical nodes.|
|
|`io.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical nodes.|
|
||||||
|`com.metamx.metrics.JvmMonitor`|Reports JVM-related statistics.|
|
|`com.metamx.metrics.JvmMonitor`|Reports JVM-related statistics.|
|
||||||
|`io.druid.segment.realtime.RealtimeMetricsMonitor`|Reports statistics on Realtime nodes.|
|
|`io.druid.segment.realtime.RealtimeMetricsMonitor`|Reports statistics on Realtime nodes.|
|
||||||
|
|`io.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|
||||||
|
|
||||||
### Emitting Metrics
|
### Emitting Metrics
|
||||||
|
|
||||||
|
|
|
@ -161,6 +161,14 @@ These metrics are only available if the JVMMonitor module is included.
|
||||||
|`jvm/gc/count`|Garbage collection count.|gcName.|< 100|
|
|`jvm/gc/count`|Garbage collection count.|gcName.|< 100|
|
||||||
|`jvm/gc/time`|Garbage collection time.|gcName.|< 1s|
|
|`jvm/gc/time`|Garbage collection time.|gcName.|< 1s|
|
||||||
|
|
||||||
|
### EventReceiverFirehose
|
||||||
|
|
||||||
|
The following metric is only available if the EventReceiverFirehoseMonitor module is included.
|
||||||
|
|
||||||
|
|Metric|Description|Dimensions|Normal Value|
|
||||||
|
|------|-----------|----------|------------|
|
||||||
|
|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, bufferCapacity.|Equal to current # of events in the buffer queue.|
|
||||||
|
|
||||||
## Sys
|
## Sys
|
||||||
|
|
||||||
These metrics are only available if the SysMonitor module is included.
|
These metrics are only available if the SysMonitor module is included.
|
||||||
|
|
|
@ -33,12 +33,12 @@ import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.data.input.Firehose;
|
import io.druid.data.input.Firehose;
|
||||||
import io.druid.data.input.FirehoseFactory;
|
import io.druid.data.input.FirehoseFactory;
|
||||||
import io.druid.data.input.InputRow;
|
import io.druid.data.input.InputRow;
|
||||||
import io.druid.data.input.Rows;
|
|
||||||
import io.druid.data.input.impl.MapInputRowParser;
|
import io.druid.data.input.impl.MapInputRowParser;
|
||||||
|
|
||||||
import io.druid.guice.annotations.Json;
|
import io.druid.guice.annotations.Json;
|
||||||
import io.druid.guice.annotations.Smile;
|
import io.druid.guice.annotations.Smile;
|
||||||
import java.io.InputStream;
|
import io.druid.server.metrics.EventReceiverFirehoseMetric;
|
||||||
|
import io.druid.server.metrics.EventReceiverFirehoseRegister;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.ws.rs.Consumes;
|
import javax.ws.rs.Consumes;
|
||||||
import javax.ws.rs.POST;
|
import javax.ws.rs.POST;
|
||||||
|
@ -48,6 +48,7 @@ import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -70,6 +71,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
|
||||||
private final Optional<ChatHandlerProvider> chatHandlerProvider;
|
private final Optional<ChatHandlerProvider> chatHandlerProvider;
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private final ObjectMapper smileMapper;
|
private final ObjectMapper smileMapper;
|
||||||
|
private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public EventReceiverFirehoseFactory(
|
public EventReceiverFirehoseFactory(
|
||||||
|
@ -77,7 +79,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
|
||||||
@JsonProperty("bufferSize") Integer bufferSize,
|
@JsonProperty("bufferSize") Integer bufferSize,
|
||||||
@JacksonInject ChatHandlerProvider chatHandlerProvider,
|
@JacksonInject ChatHandlerProvider chatHandlerProvider,
|
||||||
@JacksonInject @Json ObjectMapper jsonMapper,
|
@JacksonInject @Json ObjectMapper jsonMapper,
|
||||||
@JacksonInject @Smile ObjectMapper smileMapper
|
@JacksonInject @Smile ObjectMapper smileMapper,
|
||||||
|
@JacksonInject EventReceiverFirehoseRegister eventReceiverFirehoseRegister
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(serviceName, "serviceName");
|
Preconditions.checkNotNull(serviceName, "serviceName");
|
||||||
|
@ -87,13 +90,13 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
|
||||||
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
|
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.smileMapper = smileMapper;
|
this.smileMapper = smileMapper;
|
||||||
|
this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Firehose connect(MapInputRowParser firehoseParser) throws IOException
|
public Firehose connect(MapInputRowParser firehoseParser) throws IOException
|
||||||
{
|
{
|
||||||
log.info("Connecting firehose: %s", serviceName);
|
log.info("Connecting firehose: %s", serviceName);
|
||||||
|
|
||||||
final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
|
final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
|
||||||
|
|
||||||
if (chatHandlerProvider.isPresent()) {
|
if (chatHandlerProvider.isPresent()) {
|
||||||
|
@ -106,6 +109,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
|
||||||
log.info("No chathandler detected");
|
log.info("No chathandler detected");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eventReceiverFirehoseRegister.register(serviceName, firehose);
|
||||||
|
|
||||||
return firehose;
|
return firehose;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +126,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
|
||||||
return bufferSize;
|
return bufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public class EventReceiverFirehose implements ChatHandler, Firehose
|
public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric
|
||||||
{
|
{
|
||||||
private final BlockingQueue<InputRow> buffer;
|
private final BlockingQueue<InputRow> buffer;
|
||||||
private final MapInputRowParser parser;
|
private final MapInputRowParser parser;
|
||||||
|
@ -243,12 +248,25 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getCurrentBufferSize()
|
||||||
|
{
|
||||||
|
// ArrayBlockingQueue's implementation of size() is thread-safe, so we can use that
|
||||||
|
return buffer.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getCapacity()
|
||||||
|
{
|
||||||
|
return bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException
|
public void close() throws IOException
|
||||||
{
|
{
|
||||||
log.info("Firehose closing.");
|
log.info("Firehose closing.");
|
||||||
closed = true;
|
closed = true;
|
||||||
|
eventReceiverFirehoseRegister.unregister(serviceName);
|
||||||
if (chatHandlerProvider.isPresent()) {
|
if (chatHandlerProvider.isPresent()) {
|
||||||
chatHandlerProvider.get().unregister(serviceName);
|
chatHandlerProvider.get().unregister(serviceName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.metrics;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An EventReceiverFirehoseMetric is an object with metrics about EventReceiverFirehose objects.
|
||||||
|
* It is not likely that anything other than an EventReceiverFirehose actually implements this.
|
||||||
|
* This interface is not part of the public API and backwards incompatible changes can occur without
|
||||||
|
* requiring a major (or even minor) version change.
|
||||||
|
* The interface's primary purpose is to be able to share metrics via the EventReceiverFirehoseRegister
|
||||||
|
* without exposing the entire EventReceiverFirehose
|
||||||
|
*/
|
||||||
|
public interface EventReceiverFirehoseMetric
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Return the current number of {@link io.druid.data.input.InputRow} that are stored in the buffer.
|
||||||
|
*/
|
||||||
|
int getCurrentBufferSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the capacity of the buffer.
|
||||||
|
*/
|
||||||
|
int getCapacity();
|
||||||
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.metrics;
|
||||||
|
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
import com.metamx.metrics.AbstractMonitor;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class EventReceiverFirehoseMonitor extends AbstractMonitor
|
||||||
|
{
|
||||||
|
|
||||||
|
private final EventReceiverFirehoseRegister register;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public EventReceiverFirehoseMonitor(
|
||||||
|
EventReceiverFirehoseRegister eventReceiverFirehoseRegister
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.register = eventReceiverFirehoseRegister;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean doMonitor(ServiceEmitter emitter)
|
||||||
|
{
|
||||||
|
for (Map.Entry<String, EventReceiverFirehoseMetric> entry : register.getMetrics()) {
|
||||||
|
final String serviceName = entry.getKey();
|
||||||
|
final EventReceiverFirehoseMetric metric = entry.getValue();
|
||||||
|
|
||||||
|
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder()
|
||||||
|
.setDimension("serviceName", serviceName)
|
||||||
|
.setDimension(
|
||||||
|
"bufferCapacity",
|
||||||
|
String.valueOf(metric.getCapacity())
|
||||||
|
);
|
||||||
|
|
||||||
|
emitter.emit(builder.build("ingest/events/buffered", metric.getCurrentBufferSize()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.metrics;
|
||||||
|
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
public class EventReceiverFirehoseRegister
|
||||||
|
{
|
||||||
|
|
||||||
|
private static final Logger log = new Logger(EventReceiverFirehoseRegister.class);
|
||||||
|
|
||||||
|
private final ConcurrentMap<String, EventReceiverFirehoseMetric> metrics = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public void register(String serviceName, EventReceiverFirehoseMetric metric)
|
||||||
|
{
|
||||||
|
log.info("Registering EventReceiverFirehoseMetric for service [%s]", serviceName);
|
||||||
|
if (metrics.putIfAbsent(serviceName, metric) != null) {
|
||||||
|
throw new ISE("Service [%s] is already registered!", serviceName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Iterable<Map.Entry<String, EventReceiverFirehoseMetric>> getMetrics()
|
||||||
|
{
|
||||||
|
return metrics.entrySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unregister(String serviceName)
|
||||||
|
{
|
||||||
|
log.info("Unregistering EventReceiverFirehoseMetric for service [%s]", serviceName);
|
||||||
|
if (metrics.remove(serviceName) == null) {
|
||||||
|
log.warn("Unregistering a non-exist service. Service [%s] never exists.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -33,6 +33,7 @@ import com.metamx.metrics.MonitorScheduler;
|
||||||
import io.druid.concurrent.Execs;
|
import io.druid.concurrent.Execs;
|
||||||
import io.druid.guice.DruidBinders;
|
import io.druid.guice.DruidBinders;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.LazySingleton;
|
||||||
import io.druid.guice.ManageLifecycle;
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -59,6 +60,8 @@ public class MetricsModule implements Module
|
||||||
|
|
||||||
DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum.
|
DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum.
|
||||||
|
|
||||||
|
binder.bind(EventReceiverFirehoseRegister.class).in(LazySingleton.class);
|
||||||
|
|
||||||
// Instantiate eagerly so that we get everything registered and put into the Lifecycle
|
// Instantiate eagerly so that we get everything registered and put into the Lifecycle
|
||||||
binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness")))
|
binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness")))
|
||||||
.to(MonitorScheduler.class)
|
.to(MonitorScheduler.class)
|
||||||
|
|
|
@ -0,0 +1,220 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment.realtime.firehose;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import io.druid.concurrent.Execs;
|
||||||
|
import io.druid.data.input.impl.DimensionsSpec;
|
||||||
|
import io.druid.data.input.impl.JSONParseSpec;
|
||||||
|
import io.druid.data.input.impl.MapInputRowParser;
|
||||||
|
import io.druid.data.input.impl.TimestampSpec;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.server.metrics.EventReceiverFirehoseMetric;
|
||||||
|
import io.druid.server.metrics.EventReceiverFirehoseRegister;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
public class EventReceiverFirehoseTest
|
||||||
|
{
|
||||||
|
private static final int CAPACITY = 300;
|
||||||
|
private static final int NUM_EVENTS = 100;
|
||||||
|
private static final String SERVICE_NAME = "test_firehose";
|
||||||
|
|
||||||
|
private final String inputRow = "[{\n"
|
||||||
|
+ " \"timestamp\":123,\n"
|
||||||
|
+ " \"d1\":\"v1\"\n"
|
||||||
|
+ "}]";
|
||||||
|
|
||||||
|
private EventReceiverFirehoseFactory eventReceiverFirehoseFactory;
|
||||||
|
private EventReceiverFirehoseFactory.EventReceiverFirehose firehose;
|
||||||
|
private EventReceiverFirehoseRegister register = new EventReceiverFirehoseRegister();
|
||||||
|
private HttpServletRequest req;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
req = EasyMock.createMock(HttpServletRequest.class);
|
||||||
|
eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
|
||||||
|
SERVICE_NAME,
|
||||||
|
CAPACITY,
|
||||||
|
null,
|
||||||
|
new DefaultObjectMapper(),
|
||||||
|
new DefaultObjectMapper(),
|
||||||
|
register
|
||||||
|
);
|
||||||
|
firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory.connect(
|
||||||
|
new MapInputRowParser(
|
||||||
|
new JSONParseSpec(
|
||||||
|
new TimestampSpec(
|
||||||
|
"timestamp",
|
||||||
|
"auto",
|
||||||
|
null
|
||||||
|
), new DimensionsSpec(ImmutableList.of("d1"), null, null)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleThread() throws IOException
|
||||||
|
{
|
||||||
|
EasyMock.expect(req.getContentType()).andReturn("application/json").times(NUM_EVENTS);
|
||||||
|
EasyMock.replay(req);
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_EVENTS; ++i) {
|
||||||
|
final InputStream inputStream = IOUtils.toInputStream(inputRow);
|
||||||
|
firehose.addAll(inputStream, req);
|
||||||
|
Assert.assertEquals(i + 1, firehose.getCurrentBufferSize());
|
||||||
|
inputStream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
EasyMock.verify(req);
|
||||||
|
|
||||||
|
final Iterable<Map.Entry<String, EventReceiverFirehoseMetric>> metrics = register.getMetrics();
|
||||||
|
Assert.assertEquals(1, Iterables.size(metrics));
|
||||||
|
|
||||||
|
final Map.Entry<String, EventReceiverFirehoseMetric> entry = Iterables.getLast(metrics);
|
||||||
|
Assert.assertEquals(SERVICE_NAME, entry.getKey());
|
||||||
|
Assert.assertEquals(CAPACITY, entry.getValue().getCapacity());
|
||||||
|
Assert.assertEquals(CAPACITY, firehose.getCapacity());
|
||||||
|
Assert.assertEquals(NUM_EVENTS, entry.getValue().getCurrentBufferSize());
|
||||||
|
Assert.assertEquals(NUM_EVENTS, firehose.getCurrentBufferSize());
|
||||||
|
|
||||||
|
for (int i = NUM_EVENTS - 1; i >= 0; --i) {
|
||||||
|
Assert.assertTrue(firehose.hasMore());
|
||||||
|
Assert.assertNotNull(firehose.nextRow());
|
||||||
|
Assert.assertEquals(i, firehose.getCurrentBufferSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(CAPACITY, entry.getValue().getCapacity());
|
||||||
|
Assert.assertEquals(CAPACITY, firehose.getCapacity());
|
||||||
|
Assert.assertEquals(0, entry.getValue().getCurrentBufferSize());
|
||||||
|
Assert.assertEquals(0, firehose.getCurrentBufferSize());
|
||||||
|
|
||||||
|
firehose.close();
|
||||||
|
Assert.assertFalse(firehose.hasMore());
|
||||||
|
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException
|
||||||
|
{
|
||||||
|
EasyMock.expect(req.getContentType()).andReturn("application/json").times(2 * NUM_EVENTS);
|
||||||
|
EasyMock.replay(req);
|
||||||
|
|
||||||
|
final ExecutorService executorService = Execs.singleThreaded("single_thread");
|
||||||
|
final Future future = executorService.submit(
|
||||||
|
new Callable<Boolean>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Boolean call() throws Exception
|
||||||
|
{
|
||||||
|
for (int i = 0; i < NUM_EVENTS; ++i) {
|
||||||
|
final InputStream inputStream = IOUtils.toInputStream(inputRow);
|
||||||
|
firehose.addAll(inputStream, req);
|
||||||
|
inputStream.close();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_EVENTS; ++i) {
|
||||||
|
final InputStream inputStream = IOUtils.toInputStream(inputRow);
|
||||||
|
firehose.addAll(inputStream, req);
|
||||||
|
inputStream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
future.get(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
EasyMock.verify(req);
|
||||||
|
|
||||||
|
final Iterable<Map.Entry<String, EventReceiverFirehoseMetric>> metrics = register.getMetrics();
|
||||||
|
Assert.assertEquals(1, Iterables.size(metrics));
|
||||||
|
|
||||||
|
final Map.Entry<String, EventReceiverFirehoseMetric> entry = Iterables.getLast(metrics);
|
||||||
|
|
||||||
|
Assert.assertEquals(SERVICE_NAME, entry.getKey());
|
||||||
|
Assert.assertEquals(CAPACITY, entry.getValue().getCapacity());
|
||||||
|
Assert.assertEquals(CAPACITY, firehose.getCapacity());
|
||||||
|
Assert.assertEquals(2 * NUM_EVENTS, entry.getValue().getCurrentBufferSize());
|
||||||
|
Assert.assertEquals(2 * NUM_EVENTS, firehose.getCurrentBufferSize());
|
||||||
|
|
||||||
|
for (int i = 2 * NUM_EVENTS - 1; i >= 0; --i) {
|
||||||
|
Assert.assertTrue(firehose.hasMore());
|
||||||
|
Assert.assertNotNull(firehose.nextRow());
|
||||||
|
Assert.assertEquals(i, firehose.getCurrentBufferSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(CAPACITY, entry.getValue().getCapacity());
|
||||||
|
Assert.assertEquals(CAPACITY, firehose.getCapacity());
|
||||||
|
Assert.assertEquals(0, entry.getValue().getCurrentBufferSize());
|
||||||
|
Assert.assertEquals(0, firehose.getCurrentBufferSize());
|
||||||
|
|
||||||
|
firehose.close();
|
||||||
|
Assert.assertFalse(firehose.hasMore());
|
||||||
|
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
|
||||||
|
|
||||||
|
executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ISE.class)
|
||||||
|
public void testDuplicateRegistering() throws IOException
|
||||||
|
{
|
||||||
|
EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory(
|
||||||
|
SERVICE_NAME,
|
||||||
|
CAPACITY,
|
||||||
|
null,
|
||||||
|
new DefaultObjectMapper(),
|
||||||
|
new DefaultObjectMapper(),
|
||||||
|
register
|
||||||
|
);
|
||||||
|
EventReceiverFirehoseFactory.EventReceiverFirehose firehose2 =
|
||||||
|
(EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory2
|
||||||
|
.connect(
|
||||||
|
new MapInputRowParser(
|
||||||
|
new JSONParseSpec(
|
||||||
|
new TimestampSpec(
|
||||||
|
"timestamp",
|
||||||
|
"auto",
|
||||||
|
null
|
||||||
|
), new DimensionsSpec(ImmutableList.of("d1"), null, null)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue