Add maxIdleTime option to EventReceiverFirehose (#5997)

This commit is contained in:
Hongze Zhang 2018-09-18 04:50:56 +08:00 committed by Jonathan Wei
parent dabaf4caf8
commit 2fac6743d4
3 changed files with 162 additions and 0 deletions

View File

@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -74,6 +75,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -88,9 +90,11 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class); private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100_000; private static final int DEFAULT_BUFFER_SIZE = 100_000;
private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE;
private final String serviceName; private final String serviceName;
private final int bufferSize; private final int bufferSize;
private final long maxIdleTime;
private final Optional<ChatHandlerProvider> chatHandlerProvider; private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
@ -101,6 +105,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
public EventReceiverFirehoseFactory( public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName, @JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("maxIdleTime") Long maxIdleTime,
@JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper, @JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper, @JacksonInject @Smile ObjectMapper smileMapper,
@ -112,6 +117,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
this.serviceName = serviceName; this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ?
DEFAULT_MAX_IDLE_TIME : maxIdleTime;
this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider); this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper; this.smileMapper = smileMapper;
@ -155,9 +162,16 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
return bufferSize; return bufferSize;
} }
@JsonProperty
public long getMaxIdleTime()
{
return maxIdleTime;
}
public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric
{ {
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
private final ExecutorService idleDetector;
private final BlockingQueue<InputRow> buffer; private final BlockingQueue<InputRow> buffer;
private final InputRowParser<Map<String, Object>> parser; private final InputRowParser<Map<String, Object>> parser;
@ -168,12 +182,29 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
private final AtomicLong bytesReceived = new AtomicLong(0); private final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0); private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
private final ConcurrentMap<String, Long> producerSequences = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Long> producerSequences = new ConcurrentHashMap<>();
private final Stopwatch idleWatch = Stopwatch.createUnstarted();
public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser) public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
{ {
this.buffer = new ArrayBlockingQueue<>(bufferSize); this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser; this.parser = parser;
exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d"); exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
idleDetector = Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
idleDetector.submit(() -> {
long idled;
try {
while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < maxIdleTime) {
Thread.sleep(maxIdleTime - idled);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
log.info("Firehose has been idle for %d ms, closing.", idled);
close();
});
idleWatch.start();
} }
@POST @POST
@ -185,6 +216,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
@Context final HttpServletRequest req @Context final HttpServletRequest req
) )
{ {
idleWatch.reset();
idleWatch.start();
Access accessResult = AuthorizationUtils.authorizeResourceAction( Access accessResult = AuthorizationUtils.authorizeResourceAction(
req, req,
new ResourceAction( new ResourceAction(
@ -328,6 +361,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
chatHandlerProvider.get().unregister(serviceName); chatHandlerProvider.get().unregister(serviceName);
} }
exec.shutdown(); exec.shutdown();
idleDetector.shutdown();
idleWatch.stop();
} }
} }

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import java.util.Locale;
public class EventReceiverFirehostIdleTest
{
private static final int CAPACITY = 300;
private static final long MAX_IDLE_TIME = 5_000L;
private static final String SERVICE_NAME = "test_firehose";
private final String inputRow = "[{\n"
+ " \"timestamp\":123,\n"
+ " \"d1\":\"v1\"\n"
+ "}]";
private EventReceiverFirehoseFactory eventReceiverFirehoseFactory;
private EventReceiverFirehoseFactory.EventReceiverFirehose firehose;
private EventReceiverFirehoseRegister register = new EventReceiverFirehoseRegister();
private HttpServletRequest req;
@Before
public void setUp() throws Exception
{
req = EasyMock.createMock(HttpServletRequest.class);
eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
MAX_IDLE_TIME,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
register,
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory.connect(
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null),
null,
null
)
),
null
);
}
@Test(timeout = 40_000L)
public void testIdle() throws Exception
{
Thread.sleep(8_000L);
Assert.assertTrue(firehose.isClosed());
}
@Test(timeout = 40_000L)
public void testNotIdle() throws Exception
{
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
.andReturn(null)
.anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH))
.andReturn(null)
.anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT)
.anyTimes();
EasyMock.expect(req.getHeader("X-Firehose-Producer-Id")).andReturn(null).anyTimes();
EasyMock.expect(req.getContentType()).andReturn("application/json").anyTimes();
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(req);
final int checks = 5;
for (int i = 0; i < checks; i++) {
Assert.assertFalse(firehose.isClosed());
System.out.printf(Locale.ENGLISH, "Check %d/%d passed\n", i + 1, checks);
firehose.addAll(IOUtils.toInputStream(inputRow), req);
Thread.sleep(3_000L);
}
Thread.sleep(5_000L);
Assert.assertTrue(firehose.isClosed());
}
}

View File

@ -57,6 +57,7 @@ public class EventReceiverFirehoseTest
{ {
private static final int CAPACITY = 300; private static final int CAPACITY = 300;
private static final int NUM_EVENTS = 100; private static final int NUM_EVENTS = 100;
private static final long MAX_IDLE_TIME = Long.MAX_VALUE;
private static final String SERVICE_NAME = "test_firehose"; private static final String SERVICE_NAME = "test_firehose";
private final String inputRow = "[{\n" private final String inputRow = "[{\n"
@ -76,6 +77,7 @@ public class EventReceiverFirehoseTest
eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory( eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
SERVICE_NAME, SERVICE_NAME,
CAPACITY, CAPACITY,
MAX_IDLE_TIME,
null, null,
new DefaultObjectMapper(), new DefaultObjectMapper(),
new DefaultObjectMapper(), new DefaultObjectMapper(),
@ -217,6 +219,7 @@ public class EventReceiverFirehoseTest
EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory( EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory(
SERVICE_NAME, SERVICE_NAME,
CAPACITY, CAPACITY,
MAX_IDLE_TIME,
null, null,
new DefaultObjectMapper(), new DefaultObjectMapper(),
new DefaultObjectMapper(), new DefaultObjectMapper(),