Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose (#7038)

#### `EventReceiverFirehoseFactory`
Fixed several concurrency bugs in `EventReceiverFirehoseFactory`:
 - Race condition over putting an entry into `producerSequences` in `checkProducerSequence()`.
 - `Stopwatch` used to measure time across threads, but it's a non-thread-safe class.
 - Use `System.nanoTime()` instead of `System.currentTimeMillis()` because the latter are [not suitable](https://stackoverflow.com/a/351571/648955)  for measuring time intervals.
 - `close()` was not synchronized by could be called from multiple threads concurrently.

Removed unnecessary `readLock` (protecting `hasMore()` and `nextRow()` which are always called from a single thread). Removed unnecessary `volatile` modifiers.

Documented threading model and concurrent control flow of `EventReceiverFirehose` instances.

**Important:** please read the updated Javadoc for `EventReceiverFirehose.addAll()`. It allows events from different requests (batches) to be interleaved in the buffer. Is this OK?

#### `TimedShutoffFirehoseFactory`
- Fixed a race condition that was possible because `close()` that was not properly synchronized.

Documented threading model and concurrent control flow of `TimedShutoffFirehose` instances.

#### `Firehose`

Refined concurrency contract of `Firehose` based on `EventReceiverFirehose` implementation. Importantly, now it states that `close()` doesn't affect `hasMore()` and `nextRow()` and could be called concurrently with them. In other words, specified that `close()` is for "row supply" side rather than "row consume" side. However, I didn't check that other `Firehose` implementatations adhere to this contract.

<hr>

This issue is the result of reviewing `EventReceiverFirehose` and `TimedShutoffFirehose` using [this checklist](https://medium.com/@leventov/code-review-checklist-java-concurrency-49398c326154).
This commit is contained in:
Roman Leventov 2019-03-04 18:50:03 -03:00 committed by GitHub
parent 7bf1ee4dc0
commit 10c9f6d708
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 524 additions and 215 deletions

View File

@ -46,6 +46,7 @@
<inspection_tool class="EqualsUsesNonFinalVariable" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="EqualsWhichDoesntCheckParameterClass" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="EqualsWithItself" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="FieldAccessNotGuarded" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="FieldCanBeLocal" enabled="true" level="WARNING" enabled_by_default="true">
<option name="EXCLUDE_ANNOS">
<value>
@ -119,7 +120,7 @@
<inspection_tool class="NumberEquality" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="ObjectEquality" enabled="true" level="WARNING" enabled_by_default="true">
<option name="m_ignoreEnums" value="true" />
<option name="m_ignoreClassObjects" value="false" />
<option name="m_ignoreClassObjects" value="true" />
<option name="m_ignorePrivateConstructors" value="false" />
</inspection_tool>
<inspection_tool class="ObjectEqualsNull" enabled="true" level="ERROR" enabled_by_default="true" />
@ -356,6 +357,5 @@
<option name="ADD_SERVLET_TO_ENTRIES" value="true" />
<option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
</inspection_tool>
<inspection_tool class="FieldAccessNotGuarded" enabled="true" level="ERROR" enabled_by_default="true" />
</profile>
</component>

View File

@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* A synchronization tool for lifecycled objects (see {@link org.apache.druid.java.util.common.lifecycle.Lifecycle}, that need
* happens-before between start() and other methods and/or to check that the object was successfully started in other
* methods.
* A synchronization tool for lifecycled objects (see {@link org.apache.druid.java.util.common.lifecycle.Lifecycle},
* that need happens-before between start() and other methods and/or to check that the object was successfully started
* in other methods.
*
* Guarantees in terms of JMM: happens-before between {@link #exitStart()} and {@link #awaitStarted()},
* exitStart() and {@link #canStop()}, if it returns {@code true}.

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public final class Threads
{
/**
* Equivalent of {@link Thread#sleep(long)} with arguments and semantics of timed wait methods in classes from {@link
* java.util.concurrent} (like {@link java.util.concurrent.Semaphore#tryAcquire(long, TimeUnit)},
* {@link java.util.concurrent.locks.Lock#tryLock(long, TimeUnit)}, etc.): if the sleepTime argument is negative or
* zero, the method returns immediately. {@link Thread#sleep}, on the contrary, throws an IllegalArgumentException if
* the argument is negative and attempts to unschedule the thread if the argument is zero.
*
* @throws InterruptedException if the current thread is interrupted when this method is called or during sleeping.
*/
public static void sleepFor(long sleepTime, TimeUnit unit) throws InterruptedException
{
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (sleepTime <= 0) {
return;
}
long sleepTimeLimitNanos = System.nanoTime() + unit.toNanos(sleepTime);
while (true) {
long sleepTimeoutNanos = sleepTimeLimitNanos - System.nanoTime();
if (sleepTimeoutNanos <= 0) {
return;
}
LockSupport.parkNanos(sleepTimeoutNanos);
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}
private Threads() {}
}

View File

@ -23,19 +23,24 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
/**
* This is an interface that holds onto the stream of incoming data. Realtime data ingestion is built around this
* abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement
* one of these and register it with the Main.
* abstraction.
*
* This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
* gets passed around as an Iterator.
* <p>
* The implementation of this interface only needs to be minimally thread-safe. The three methods ##hasMore(),
* ##nextRow() and ##commit() are all called from the same thread. ##commit(), however, returns a callback
* which will be called on another thread, so the operations inside of that callback must be thread-safe.
* Closeable and it is very important that the {@link #close()} method doesn't get forgotten, which is easy to do if
* this gets passed around as an Iterator. Note that {@link #close()} doesn't cut the stream of rows for Firehose users
* immediately, but rather stops the supply of new rows into internal buffers. {@link #hasMore()} and {@link #nextRow()}
* are expected to operate for some time after (or concurrently with) {@link #close()} until the buffered events (if
* any) run out.
*
* Concurrency:
* The three methods {@link #hasMore()}, {@link #nextRow()} and {@link #commit()} are all called from the same thread.
* {@link #commit()}, however, returns a callback which will be called on another thread. {@link #close()} might be
* called concurrently from a thread different from the thread calling {@link #hasMore()}, {@link #nextRow()} and {@link
* #commit()}.
* </p>
*/
@ExtensionPoint
@ -43,8 +48,8 @@ public interface Firehose extends Closeable
{
/**
* Returns whether there are more rows to process. This is used to indicate that another item is immediately
* available via ##nextRow(). Thus, if the stream is still available but there are no new messages on it, this call
* should block until a new message is available.
* available via {@link #nextRow()}. Thus, if the stream is still available but there are no new messages on it, this
* call should block until a new message is available.
*
* If something happens such that the stream is no longer available, this should return false.
*
@ -77,8 +82,22 @@ public interface Firehose extends Closeable
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
* because of InputRows delivered by prior calls to ##nextRow().
* because of InputRows delivered by prior calls to {@link #nextRow()}.
* </p>
*/
Runnable commit();
/**
* Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()}, {@link
* #nextRow()} and {@link #commit()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
* continue to work after close(), but since the ingestion side is closed rows will eventually run out.
*
* The effects of calling run() on the {@link Runnable} object returned from {@link #commit()} (in other words,
* doing the commit) concurrently or after close() are unspecified: commit may not be performed silently (that is,
* run() call completes without an Exception, but the commit is not actually done), or a error may result. Note that
* {@link #commit()} method itself can be called concurrently with close(), but it doesn't make much sense, because
* run() on the returned Runnable then can't be called.
*/
@Override
void close() throws IOException;
}

View File

@ -42,9 +42,10 @@ import java.io.Closeable;
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
* gets passed around as an Iterator.
*
* The implementation of this interface only needs to be minimally thread-safe. The methods ##start(), ##advance(),
* ##currRow() and ##makeCommitter() are all called from the same thread. ##makeCommitter(), however, returns a callback
* which will be called on another thread, so the operations inside of that callback must be thread-safe.
* The implementation of this interface only needs to be minimally thread-safe. The methods {@link #start()}, {@link
* #advance()}, {@link #currRow()} and {@link #makeCommitter()} are all called from the same thread. {@link
* #makeCommitter()}, however, returns a callback which will be called on another thread, so the operations inside of
* that callback must be thread-safe.
*/
@ExtensionPoint
public interface FirehoseV2 extends Closeable

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import java.io.Closeable;
import java.io.IOException;
/**
* Methods in this class could have belonged to {@link org.apache.druid.java.util.common.io.Closer}, but not editing
* that class to keep its source close to Guava source.
*/
public final class CloseableUtils
{
/**
* Call method instead of code like
*
* first.close();
* second.close();
*
* to have safety of {@link org.apache.druid.java.util.common.io.Closer}, but without associated boilerplate code
* of creating a Closer and registering objects in it.
*/
public static void closeBoth(Closeable first, Closeable second) throws IOException
{
//noinspection EmptyTryBlock
try (Closeable ignore1 = second; Closeable ignore2 = first) {
// piggy-back try-with-resources semantics
}
}
private CloseableUtils() {}
}

View File

@ -191,8 +191,9 @@ When using this firehose, events can be sent by submitting a POST request to the
|property|description|required?|
|--------|-----------|---------|
|type|This should be "receiver"|yes|
|serviceName|name used to announce the event receiver service endpoint|yes|
|bufferSize| size of buffer used by firehose to store events|no default(100000)|
|serviceName|Name used to announce the event receiver service endpoint|yes|
|maxIdleTime|A firehose is automatically shut down after not receiving any events for this period of time, in milliseconds. If not specified, a firehose is never shut down due to being idle. Zero and negative values have the same effect.|no|
|bufferSize|Size of buffer used by firehose to store events|no, default is 100000|
Shut down time for EventReceiverFirehose can be specified by submitting a POST request to

View File

@ -28,10 +28,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingInputStream;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.concurrent.Threads;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
@ -39,7 +40,6 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
@ -50,6 +50,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -70,12 +71,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -85,16 +83,29 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowParser<Map<String, Object>>>
{
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
public static final int MAX_FIREHOSE_PRODUCERS = 10_000;
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100_000;
private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE;
/**
* A "poison pill" object for {@link EventReceiverFirehose}'s internal buffer.
*/
private static final Object FIREHOSE_CLOSED = new Object();
private final String serviceName;
private final int bufferSize;
private final long maxIdleTime;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
/**
* Doesn't really support max idle times finer than 1 second due to how {@link
* EventReceiverFirehose#delayedCloseExecutor} is implemented, see a comment inside {@link
* EventReceiverFirehose#createDelayedCloseExecutor()}. This aspect is not reflected in docs because it's unlikely
* that anybody configures or cares about finer max idle times, and also because this is an implementation detail of
* {@link EventReceiverFirehose} that may change in the future.
*/
private final long maxIdleTimeMillis;
private final @Nullable ChatHandlerProvider chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
@ -104,7 +115,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("maxIdleTime") Long maxIdleTime,
// Keeping the legacy 'maxIdleTime' property name for backward compatibility. When the project is updated to
// Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
@JsonProperty("maxIdleTime") @Nullable Long maxIdleTimeMillis,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper,
@ -116,9 +129,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ?
DEFAULT_MAX_IDLE_TIME : maxIdleTime;
this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
this.maxIdleTimeMillis = (maxIdleTimeMillis == null || maxIdleTimeMillis <= 0) ? Long.MAX_VALUE : maxIdleTimeMillis;
this.chatHandlerProvider = chatHandlerProvider;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
@ -134,12 +146,12 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
log.info("Connecting firehose: %s", serviceName);
final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
if (chatHandlerProvider.isPresent()) {
log.info("Found chathandler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(serviceName, firehose);
if (chatHandlerProvider != null) {
log.info("Found chathandler of class[%s]", chatHandlerProvider.getClass().getName());
chatHandlerProvider.register(serviceName, firehose);
int lastIndexOfColon = serviceName.lastIndexOf(':');
if (lastIndexOfColon > 0) {
chatHandlerProvider.get().register(serviceName.substring(lastIndexOfColon + 1), firehose);
chatHandlerProvider.register(serviceName.substring(lastIndexOfColon + 1), firehose);
}
} else {
log.warn("No chathandler detected");
@ -162,62 +174,178 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
return bufferSize;
}
@JsonProperty
public long getMaxIdleTime()
/**
* Keeping the legacy 'maxIdleTime' property name for backward compatibility. When the project is updated to Jackson
* 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
*/
@JsonProperty("maxIdleTime")
public long getMaxIdleTimeMillis()
{
return maxIdleTime;
return maxIdleTimeMillis;
}
/**
* Apart from adhering to {@link Firehose} contract regarding concurrency, this class has two methods that might be
* called concurrently with any other methods and each other, from arbitrary number of threads: {@link #addAll} and
* {@link #shutdown}.
*
* Concurrent data flow: in {@link #addAll} (can be called concurrently with any other methods and other calls to
* {@link #addAll}) rows are pushed into {@link #buffer}. The single Firehose "consumer" thread calls {@link #hasMore}
* and {@link #nextRow()}, where rows are taken out from the other end of the {@link #buffer} queue.
*
* This class creates and manages one thread ({@link #delayedCloseExecutor}) for calling {@link #close()}
* asynchronously in response to a {@link #shutdown} request, or after this Firehose has been idle (no calls to {@link
* #addAll}) for {@link #maxIdleTimeMillis}.
*/
@VisibleForTesting
public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric
{
private final ScheduledExecutorService exec;
private final ExecutorService idleDetector;
private final BlockingQueue<InputRow> buffer;
/**
* How does this thread work (and its interruption policy) is described in the comment for {@link
* #createDelayedCloseExecutor}.
*/
@GuardedBy("this")
private @Nullable Thread delayedCloseExecutor;
/**
* Contains {@link InputRow} objects, the last one is {@link #FIREHOSE_CLOSED} which is a "poison pill". Poison pill
* is used to notify the thread that calls {@link #hasMore()} and {@link #nextRow()} that the EventReceiverFirehose
* is closed without heuristic 500 ms timed blocking in a loop instead of a simple {@link BlockingQueue#take()}
* call (see {@link #hasMore} code).
*/
private final BlockingQueue<Object> buffer;
private final InputRowParser<Map<String, Object>> parser;
private final Object readLock = new Object();
private volatile InputRow nextRow = null;
/**
* This field needs to be volatile to ensure progress in {@link #addRows} method where it is read in a loop, and
* also in testing code calling {@link #isClosed()}.
*/
private volatile boolean closed = false;
private final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
private final ConcurrentHashMap<String, Long> producerSequences = new ConcurrentHashMap<>();
private final Stopwatch idleWatch = Stopwatch.createUnstarted();
public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
/**
* This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and
* {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec.
*/
private InputRow nextRow = null;
private boolean rowsRunOut = false;
private final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong lastBufferAddFailLoggingTimeNs = new AtomicLong(System.nanoTime());
private final ConcurrentHashMap<String, Long> producerSequences = new ConcurrentHashMap<>();
/**
* This field and {@link #requestedShutdownTimeNs} use nanoseconds instead of milliseconds not to deal with the fact
* that {@link System#currentTimeMillis()} can "go backward", e. g. due to time correction on the server.
*
* This field and {@link #requestedShutdownTimeNs} must be volatile because they are de facto lazily initialized
* fields that are used concurrently in {@link #delayedCloseExecutor} (see {@link #createDelayedCloseExecutor()}).
* If they were not volatile, NPE would be possible in {@link #delayedCloseExecutor}. See
* https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-hb-actual for explanations.
*/
private volatile Long idleCloseTimeNs = null;
private volatile Long requestedShutdownTimeNs = null;
EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
{
this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser;
exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
idleDetector = Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
idleDetector.submit(() -> {
long idled;
try {
while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < maxIdleTime) {
Thread.sleep(maxIdleTime - idled);
}
if (maxIdleTimeMillis != Long.MAX_VALUE) {
idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis);
synchronized (this) {
createDelayedCloseExecutor();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
log.info("Firehose has been idle for %d ms, closing.", idled);
close();
});
idleWatch.start();
}
}
@VisibleForTesting
synchronized @Nullable Thread getDelayedCloseExecutor()
{
return delayedCloseExecutor;
}
/**
* Creates and starts a {@link #delayedCloseExecutor} thread, either right from the EventReceiverFirehose's
* constructor if {@link #maxIdleTimeMillis} is specified, or otherwise lazily from {@link #shutdown}.
*
* The thread waits until the time when the Firehose should be closed because either {@link #addAll} was not called
* for the specified max idle time (see {@link #idleCloseTimeNs}), or until the shutoff time requested last
* via {@link #shutdown} (see {@link #requestedShutdownTimeNs}), whatever is sooner. Then the thread does
* two things:
* 1. if the Firehose is already closed (or in the process of closing, but {@link #closed} flag is already set), it
* silently exits.
* 2. It checks both deadlines again:
* a) if either of them has arrived, it calls {@link #close()} and exits.
* b) otherwise, it waits until the nearest deadline again, and so on in a loop.
*
* This way the thread works predictably and robustly regardless of how both deadlines change (for example, shutoff
* time specified via {@link #shutdown} may jump in both directions).
*
* Other methods notify {@link #delayedCloseExecutor} that the Firehose state in some way that is important for this
* thread (that is, when {@link #close()} is called, {@link #delayedCloseExecutor} is no longer needed and should
* exit as soon as possible to release system resources; when {@link #shutdown} is called, the thread may need to
* wake up sooner if the shutoff time has been moved sooner) by simply interrupting it. The thread wakes up and
* continues its loop.
*/
@GuardedBy("this")
private Thread createDelayedCloseExecutor()
{
Thread delayedCloseExecutor = new Thread(
() -> {
// The closed = true is visible after close() because there is a happens-before edge between
// delayedCloseExecutor.interrupt() call in close() and catching InterruptedException below in this loop.
while (!closed) {
if (idleCloseTimeNs == null && requestedShutdownTimeNs == null) {
// This is not possible unless there are bugs in the code of EventReceiverFirehose. AssertionError could
// have been thrown instead, but it doesn't seem to make a lot of sense in a background thread. Instead,
// we long the error and continue a loop after some pause.
log.error(
"Either idleCloseTimeNs or requestedShutdownTimeNs must be non-null. "
+ "Please file a bug at https://github.com/apache/incubator-druid/issues"
);
}
if (idleCloseTimeNs != null && idleCloseTimeNs - System.nanoTime() <= 0) { // overflow-aware comparison
log.info("Firehose has been idle for %d ms, closing.", maxIdleTimeMillis);
close();
} else if (requestedShutdownTimeNs != null &&
requestedShutdownTimeNs - System.nanoTime() <= 0) { // overflow-aware comparison
log.info("Closing Firehose after a shutdown request");
close();
}
try {
// It is possible to write code that sleeps until the next the next idleCloseTimeNs or
// requestedShutdownTimeNs, whatever is non-null and sooner, but that's fairly complicated code. That
// complexity perhaps overweighs the minor inefficiency of simply waking up every second.
Threads.sleepFor(1, TimeUnit.SECONDS);
}
catch (InterruptedException ignore) {
// Interruption is a wakeup, continue the loop
}
}
},
"event-receiver-firehose-closer"
);
delayedCloseExecutor.setDaemon(true);
this.delayedCloseExecutor = delayedCloseExecutor;
delayedCloseExecutor.start();
return delayedCloseExecutor;
}
/**
* This method might be called concurrently from multiple threads, if multiple requests arrive to the server at the
* same time (possibly exact duplicates). Concurrency is controlled in {@link #checkProducerSequence}, where only
* requests with "X-Firehose-Producer-Seq" number greater than the max "X-Firehose-Producer-Seq" in previously
* arrived requests are allowed to proceed. After that check requests don't synchronize with each other and
* therefore if two large batches are sent with little interval, the events from the batches might be mixed up in
* {@link #buffer} (if two {@link #addRows(Iterable)} are executed concurrently).
*/
@POST
@Path("/push-events")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(
InputStream in,
@Context final HttpServletRequest req
)
public Response addAll(InputStream in, @Context final HttpServletRequest req) throws JsonProcessingException
{
idleWatch.reset();
idleWatch.start();
idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis);
Access accessResult = AuthorizationUtils.authorizeResourceAction(
req,
new ResourceAction(
@ -236,9 +364,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
Optional<Response> producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
if (producerSequenceResponse.isPresent()) {
return producerSequenceResponse.get();
Response producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
if (producerSequenceResponse != null) {
return producerSequenceResponse;
}
CountingInputStream countingInputStream = new CountingInputStream(in);
@ -274,67 +402,59 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@Override
public boolean hasMore()
{
synchronized (readLock) {
try {
while (nextRow == null) {
nextRow = buffer.poll(500, TimeUnit.MILLISECONDS);
if (closed) {
break;
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
return nextRow != null;
if (rowsRunOut) {
return false;
}
if (nextRow != null) {
return true;
}
Object next;
try {
next = buffer.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
//noinspection ObjectEquality
if (next == FIREHOSE_CLOSED) {
rowsRunOut = true;
return false;
}
nextRow = (InputRow) next;
return true;
}
@Nullable
@Override
public InputRow nextRow()
{
synchronized (readLock) {
final InputRow row = nextRow;
final InputRow row = nextRow;
if (row == null) {
throw new NoSuchElementException();
} else {
nextRow = null;
return row;
}
if (row == null) {
throw new NoSuchElementException();
} else {
nextRow = null;
return row;
}
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
// Nothing
}
};
return Runnables.getNoopRunnable();
}
@Override
public int getCurrentBufferSize()
{
// ArrayBlockingQueue's implementation of size() is thread-safe, so we can use that
return buffer.size();
}
@ -350,34 +470,44 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
return bytesReceived.get();
}
/**
* This method is synchronized because it might be called concurrently from multiple threads: from {@link
* #delayedCloseExecutor}, and from the thread that creates and uses the Firehose object.
*/
@Override
public void close()
public synchronized void close()
{
if (!closed) {
log.info("Firehose closing.");
closed = true;
if (closed) {
return;
}
closed = true;
log.info("Firehose closing.");
eventReceiverFirehoseRegister.unregister(serviceName);
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(serviceName);
}
exec.shutdown();
idleDetector.shutdown();
idleWatch.stop();
// Critical to add the poison pill to the queue, don't allow interruption.
Uninterruptibles.putUninterruptibly(buffer, FIREHOSE_CLOSED);
eventReceiverFirehoseRegister.unregister(serviceName);
if (chatHandlerProvider != null) {
chatHandlerProvider.unregister(serviceName);
}
if (delayedCloseExecutor != null && !delayedCloseExecutor.equals(Thread.currentThread())) {
// Interrupt delayedCloseExecutor to let it discover that closed flag is already set and exit.
delayedCloseExecutor.interrupt();
}
}
// public for tests
public void addRows(Iterable<InputRow> rows) throws InterruptedException
@VisibleForTesting
void addRows(Iterable<InputRow> rows) throws InterruptedException
{
for (final InputRow row : rows) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
if (!added) {
long currTime = System.currentTimeMillis();
long lastTime = lastBufferAddFailMsgTime.get();
if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
long currTimeNs = System.nanoTime();
long lastTimeNs = lastBufferAddFailLoggingTimeNs.get();
if (currTimeNs - lastTimeNs > TimeUnit.SECONDS.toNanos(10) &&
lastBufferAddFailLoggingTimeNs.compareAndSet(lastTimeNs, currTimeNs)) {
log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
}
}
@ -389,12 +519,19 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
}
/**
* This method might be called concurrently from multiple threads, if multiple shutdown requests arrive at the same
* time. No attempts are made to synchronize such requests, or prioritize them a-la "latest shutdown time wins" or
* "soonest shutdown time wins". {@link #delayedCloseExecutor}'s logic (see {@link #createDelayedCloseExecutor()})
* is indifferent to shutdown times jumping in arbitrary directions. But once a shutdown request is made, it can't
* be cancelled entirely, the shutdown time could only be rescheduled with a new request.
*/
@POST
@Path("/shutdown")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response shutdown(
@QueryParam("shutoffTime") final String shutoffTime,
@QueryParam("shutoffTime") final String shutoffTimeMillis,
@Context final HttpServletRequest req
)
{
@ -411,13 +548,27 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
try {
DateTime shutoffAt = shutoffTime == null ? DateTimes.nowUtc() : DateTimes.of(shutoffTime);
log.info("Setting Firehose shutoffTime to %s", shutoffTime);
exec.schedule(
this::close,
shutoffAt.getMillis() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
);
DateTime shutoffAt = shutoffTimeMillis == null ? DateTimes.nowUtc() : DateTimes.of(shutoffTimeMillis);
log.info("Setting Firehose shutoffTime to %s", shutoffTimeMillis);
long shutoffTimeoutMillis = Math.max(shutoffAt.getMillis() - System.currentTimeMillis(), 0);
requestedShutdownTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(shutoffTimeoutMillis);
Thread delayedCloseExecutor;
// Need to interrupt delayedCloseExecutor because a newly specified shutdown time might be closer than idle
// timeout or previously specified shutdown. Interruption of delayedCloseExecutor lets it adjust the sleep time
// (see the logic of this thread in createDelayedCloseExecutor()).
boolean needToInterruptDelayedCloseExecutor = true;
synchronized (this) {
delayedCloseExecutor = this.delayedCloseExecutor;
if (delayedCloseExecutor == null) {
delayedCloseExecutor = createDelayedCloseExecutor();
// Don't need to interrupt a freshly created thread
needToInterruptDelayedCloseExecutor = false;
}
}
if (needToInterruptDelayedCloseExecutor) {
delayedCloseExecutor.interrupt();
}
return Response.ok().build();
}
catch (IllegalArgumentException e) {
@ -429,7 +580,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
@VisibleForTesting
public boolean isClosed()
boolean isClosed()
{
return closed;
}
@ -437,14 +588,17 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
/**
* Checks the request for a producer ID and sequence value. If the producer ID is specified, a corresponding
* sequence value must be specified as well. If the incoming sequence is less than or equal to the last seen
* sequence for that producer ID, the request is ignored
* sequence for that producer ID, the request is ignored.
*
* This method might be called concurrently from multiple threads.
*
* @param req Http request
* @param responseContentType Response content type
* @param responseMapper Response object mapper
* @return Optional of a response to return of an empty optional if the request can proceed
* @return an error response to return or null if the request can proceed
*/
private Optional<Response> checkProducerSequence(
@Nullable
private Response checkProducerSequence(
final HttpServletRequest req,
final String responseContentType,
final ObjectMapper responseMapper
@ -453,61 +607,57 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
final String producerId = req.getHeader("X-Firehose-Producer-Id");
if (producerId == null) {
return Optional.empty();
return null;
}
final String sequenceValue = req.getHeader("X-Firehose-Producer-Seq");
if (sequenceValue == null) {
return Optional.of(
Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", "Producer sequence value is missing"))
.build()
);
return Response
.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", "Producer sequence value is missing"))
.build();
}
Long producerSequence = producerSequences.computeIfAbsent(producerId, key -> Long.MIN_VALUE);
if (producerSequences.size() >= MAX_FIREHOSE_PRODUCERS) {
return Optional.of(
Response.status(Response.Status.FORBIDDEN)
.entity(
ImmutableMap.<String, Object>of(
"error",
"Too many individual producer IDs for this firehose. Max is " + MAX_FIREHOSE_PRODUCERS
)
)
.build()
);
return Response
.status(Response.Status.FORBIDDEN)
.entity(
ImmutableMap.of(
"error",
"Too many individual producer IDs for this firehose. Max is " + MAX_FIREHOSE_PRODUCERS
)
)
.build();
}
try {
Long newSequence = Long.parseLong(sequenceValue);
if (newSequence <= producerSequence) {
return Optional.of(
Response.ok(
responseMapper.writeValueAsString(
ImmutableMap.of("eventCount", 0, "skipped", true)
),
responseContentType
).build()
);
}
producerSequences.put(producerId, newSequence);
while (true) {
if (newSequence <= producerSequence) {
return Response.ok(
responseMapper.writeValueAsString(ImmutableMap.of("eventCount", 0, "skipped", true)),
responseContentType
).build();
}
if (producerSequences.replace(producerId, producerSequence, newSequence)) {
return null;
}
producerSequence = producerSequences.get(producerId);
}
}
catch (JsonProcessingException ex) {
throw Throwables.propagate(ex);
throw new RuntimeException(ex);
}
catch (NumberFormatException ex) {
return Optional.of(
Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", "Producer sequence must be a number"))
.build()
);
return Response
.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", "Producer sequence must be a number"))
.build();
}
return Optional.empty();
}
}
}

View File

@ -21,12 +21,14 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -37,10 +39,14 @@ import java.util.concurrent.TimeUnit;
/**
* Creates firehoses that shut off at a particular time. Useful for limiting the lifespan of a realtime job.
*
* Each {@link Firehose} created by this factory spins up and manages one thread for calling {@link Firehose#close()}
* asynchronously at the specified {@link #shutoffTime}.
*/
public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(FirehoseFactory.class);
private final FirehoseFactory delegateFactory;
private final DateTime shutoffTime;
@ -63,31 +69,25 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
class TimedShutoffFirehose implements Firehose
{
private final Firehose firehose;
private final ScheduledExecutorService exec;
private final Object shutdownLock = new Object();
private volatile boolean shutdown = false;
private final ScheduledExecutorService shutdownExec;
@GuardedBy("this")
private boolean closed = false;
TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory) throws IOException
{
firehose = delegateFactory.connect(parser, temporaryDirectory);
exec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
exec.schedule(
new Runnable()
{
@Override
public void run()
{
log.info("Closing delegate firehose.");
shutdownExec.schedule(
() -> {
log.info("Closing delegate firehose.");
shutdown = true;
try {
firehose.close();
}
catch (IOException e) {
log.warn(e, "Failed to close delegate firehose, ignoring.");
}
try {
TimedShutoffFirehose.this.close();
}
catch (IOException e) {
log.warn(e, "Failed to close delegate firehose, ignoring.");
}
},
shutoffTime.getMillis() - System.currentTimeMillis(),
@ -116,14 +116,16 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
return firehose.commit();
}
/**
* This method is synchronized because it might be called concurrently from multiple threads: from {@link
* #shutdownExec}, and explicitly on this Firehose object.
*/
@Override
public void close() throws IOException
public synchronized void close() throws IOException
{
synchronized (shutdownLock) {
if (!shutdown) {
shutdown = true;
firehose.close();
}
if (!closed) {
closed = true;
CloseableUtils.closeBoth(firehose, shutdownExec::shutdownNow);
}
}
}

View File

@ -87,8 +87,22 @@ public class EventReceiverFirehoseIdleTest
@Test(timeout = 40_000L)
public void testIdle() throws Exception
{
Thread.sleep(8_000L);
Assert.assertTrue(firehose.isClosed());
awaitFirehoseClosed();
awaitDelayedExecutorThreadTerminated();
}
private void awaitFirehoseClosed() throws InterruptedException
{
while (!firehose.isClosed()) {
Thread.sleep(50);
}
}
private void awaitDelayedExecutorThreadTerminated() throws InterruptedException
{
while (firehose.getDelayedCloseExecutor().getState() != Thread.State.TERMINATED) {
Thread.sleep(50);
}
}
@Test(timeout = 40_000L)
@ -117,7 +131,7 @@ public class EventReceiverFirehoseIdleTest
Thread.sleep(3_000L);
}
Thread.sleep(5_000L);
Assert.assertTrue(firehose.isClosed());
awaitFirehoseClosed();
awaitDelayedExecutorThreadTerminated();
}
}

View File

@ -57,7 +57,7 @@ public class EventReceiverFirehoseTest
{
private static final int CAPACITY = 300;
private static final int NUM_EVENTS = 100;
private static final long MAX_IDLE_TIME = Long.MAX_VALUE;
private static final long MAX_IDLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(20);
private static final String SERVICE_NAME = "test_firehose";
private final String inputRow = "[{\n"
@ -77,7 +77,7 @@ public class EventReceiverFirehoseTest
eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
MAX_IDLE_TIME,
MAX_IDLE_TIME_MILLIS,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
@ -100,8 +100,8 @@ public class EventReceiverFirehoseTest
);
}
@Test
public void testSingleThread() throws IOException
@Test(timeout = 60_000L)
public void testSingleThread() throws IOException, InterruptedException
{
for (int i = 0; i < NUM_EVENTS; ++i) {
setUpRequestExpectations(null, null);
@ -138,9 +138,10 @@ public class EventReceiverFirehoseTest
Assert.assertFalse(firehose.hasMore());
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
awaitDelayedExecutorThreadTerminated();
}
@Test
@Test(timeout = 60_000L)
public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException
{
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
@ -210,6 +211,8 @@ public class EventReceiverFirehoseTest
Assert.assertFalse(firehose.hasMore());
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
awaitDelayedExecutorThreadTerminated();
executorService.shutdownNow();
}
@ -219,7 +222,7 @@ public class EventReceiverFirehoseTest
EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
MAX_IDLE_TIME,
MAX_IDLE_TIME_MILLIS,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
@ -259,11 +262,24 @@ public class EventReceiverFirehoseTest
EasyMock.replay(req);
firehose.shutdown(DateTimes.nowUtc().minusMinutes(2).toString(), req);
awaitFirehoseClosed();
awaitDelayedExecutorThreadTerminated();
}
private void awaitFirehoseClosed() throws InterruptedException
{
while (!firehose.isClosed()) {
Thread.sleep(50);
}
}
private void awaitDelayedExecutorThreadTerminated() throws InterruptedException
{
while (firehose.getDelayedCloseExecutor().getState() != Thread.State.TERMINATED) {
Thread.sleep(50);
}
}
@Test(timeout = 60_000L)
public void testShutdown() throws Exception
{
@ -279,9 +295,8 @@ public class EventReceiverFirehoseTest
EasyMock.replay(req);
firehose.shutdown(DateTimes.nowUtc().plusMillis(100).toString(), req);
while (!firehose.isClosed()) {
Thread.sleep(50);
}
awaitFirehoseClosed();
awaitDelayedExecutorThreadTerminated();
}
@Test
@ -322,7 +337,6 @@ public class EventReceiverFirehoseTest
firehose.close();
Assert.assertFalse(firehose.hasMore());
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
}
@Test