Notify refresh listeners on the calling thread (#53259)
Today we notify refresh listeners by forking to the listener thread pool and then serially notifying listeners on a thread there. Refreshes are expensive though, so the expectation is that we are executing refreshes on threads that can afford an expensive operation (e.g., not a network thread) and as such, executing listeners that we expect to be cheap aon the calling thread is okay. This commit removes the forking of notifying refresh listeners to run directly on the calling thread that executed a refresh.
This commit is contained in:
parent
341417613e
commit
c5738ae312
|
@ -3179,7 +3179,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
return new RefreshListeners(
|
return new RefreshListeners(
|
||||||
indexSettings::getMaxRefreshListeners,
|
indexSettings::getMaxRefreshListeners,
|
||||||
() -> refresh("too_many_listeners"),
|
() -> refresh("too_many_listeners"),
|
||||||
threadPool.executor(ThreadPool.Names.LISTENER),
|
|
||||||
logger, threadPool.getThreadContext(),
|
logger, threadPool.getThreadContext(),
|
||||||
externalRefreshMetric);
|
externalRefreshMetric);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,6 @@ import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.IntSupplier;
|
import java.util.function.IntSupplier;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
@ -48,7 +47,6 @@ import static java.util.Objects.requireNonNull;
|
||||||
public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable {
|
public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable {
|
||||||
private final IntSupplier getMaxRefreshListeners;
|
private final IntSupplier getMaxRefreshListeners;
|
||||||
private final Runnable forceRefresh;
|
private final Runnable forceRefresh;
|
||||||
private final Executor listenerExecutor;
|
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
private final ThreadContext threadContext;
|
private final ThreadContext threadContext;
|
||||||
private final MeanMetric refreshMetric;
|
private final MeanMetric refreshMetric;
|
||||||
|
@ -82,11 +80,15 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
|
||||||
*/
|
*/
|
||||||
private volatile Translog.Location lastRefreshedLocation;
|
private volatile Translog.Location lastRefreshedLocation;
|
||||||
|
|
||||||
public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefresh, Executor listenerExecutor, Logger logger,
|
public RefreshListeners(
|
||||||
ThreadContext threadContext, MeanMetric refreshMetric) {
|
final IntSupplier getMaxRefreshListeners,
|
||||||
|
final Runnable forceRefresh,
|
||||||
|
final Logger logger,
|
||||||
|
final ThreadContext threadContext,
|
||||||
|
final MeanMetric refreshMetric
|
||||||
|
) {
|
||||||
this.getMaxRefreshListeners = getMaxRefreshListeners;
|
this.getMaxRefreshListeners = getMaxRefreshListeners;
|
||||||
this.forceRefresh = forceRefresh;
|
this.forceRefresh = forceRefresh;
|
||||||
this.listenerExecutor = listenerExecutor;
|
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.threadContext = threadContext;
|
this.threadContext = threadContext;
|
||||||
this.refreshMetric = refreshMetric;
|
this.refreshMetric = refreshMetric;
|
||||||
|
@ -282,24 +284,22 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Lastly, fire the listeners that are ready on the listener thread pool
|
// Lastly, fire the listeners that are ready
|
||||||
fireListeners(listenersToFire);
|
fireListeners(listenersToFire);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fire some listeners. Does nothing if the list of listeners is null.
|
* Fire some listeners. Does nothing if the list of listeners is null.
|
||||||
*/
|
*/
|
||||||
private void fireListeners(List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
|
private void fireListeners(final List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
|
||||||
if (listenersToFire != null) {
|
if (listenersToFire != null) {
|
||||||
listenerExecutor.execute(() -> {
|
for (final Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
|
||||||
for (Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
|
|
||||||
try {
|
try {
|
||||||
listener.v2().accept(false);
|
listener.v2().accept(false);
|
||||||
} catch (Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.warn("Error firing refresh listener", e);
|
logger.warn("error firing refresh listener", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,8 +102,6 @@ public class RefreshListenersTests extends ESTestCase {
|
||||||
listeners = new RefreshListeners(
|
listeners = new RefreshListeners(
|
||||||
() -> maxListeners,
|
() -> maxListeners,
|
||||||
() -> engine.refresh("too-many-listeners"),
|
() -> engine.refresh("too-many-listeners"),
|
||||||
// Immediately run listeners rather than adding them to the listener thread pool like IndexShard does to simplify the test.
|
|
||||||
Runnable::run,
|
|
||||||
logger,
|
logger,
|
||||||
threadPool.getThreadContext(),
|
threadPool.getThreadContext(),
|
||||||
refreshMetric);
|
refreshMetric);
|
||||||
|
|
Loading…
Reference in New Issue