From 5e96d3e59aeff620257ceea460b8d46741b7f46d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 8 Mar 2020 13:48:09 -0400 Subject: [PATCH] Use given executor for global checkpoint listener (#53260) Today when notifying a global checkpoint listener, we use the listener thread pool. This commit turns this inside out so that the global checkpoint listener must provide an executor on which to notify the listener. --- .../shard/GlobalCheckpointListeners.java | 72 +++---- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../shard/GlobalCheckpointListenersIT.java | 170 ++++++++++++++++ .../shard/GlobalCheckpointListenersTests.java | 192 ++++++++++++------ .../index/shard/IndexShardIT.java | 82 -------- .../xpack/ccr/action/ShardChangesAction.java | 24 ++- 6 files changed, 360 insertions(+), 182 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersIT.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 57e28d96ad2..d2172dc056e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -51,8 +51,15 @@ public class GlobalCheckpointListeners implements Closeable { /** * A global checkpoint listener consisting of a callback that is notified when the global checkpoint is updated or the shard is closed. */ - @FunctionalInterface public interface GlobalCheckpointListener { + + /** + * The executor on which the listener is notified. + * + * @return the executor + */ + Executor executor(); + /** * Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an @@ -63,6 +70,7 @@ public class GlobalCheckpointListeners implements Closeable { * @param e if non-null, the shard is closed or the listener timed out */ void accept(long globalCheckpoint, Exception e); + } // guarded by this @@ -71,7 +79,6 @@ public class GlobalCheckpointListeners implements Closeable { private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; private final ShardId shardId; - private final Executor executor; private final ScheduledExecutorService scheduler; private final Logger logger; @@ -79,17 +86,14 @@ public class GlobalCheckpointListeners implements Closeable { * Construct a global checkpoint listeners collection. * * @param shardId the shard ID on which global checkpoint updates can be listened to - * @param executor the executor for listener notifications * @param scheduler the executor used for scheduling timeouts * @param logger a shard-level logger */ GlobalCheckpointListeners( - final ShardId shardId, - final Executor executor, - final ScheduledExecutorService scheduler, - final Logger logger) { + final ShardId shardId, + final ScheduledExecutorService scheduler, + final Logger logger) { this.shardId = Objects.requireNonNull(shardId, "shardId"); - this.executor = Objects.requireNonNull(executor, "executor"); this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); this.logger = Objects.requireNonNull(logger, "logger"); } @@ -109,12 +113,12 @@ public class GlobalCheckpointListeners implements Closeable { */ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) { if (closed) { - executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); + notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); return; } if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint) { // notify directly - executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); + notifyListener(listener, lastKnownGlobalCheckpoint, null); } else { if (timeout == null) { listeners.put(listener, Tuple.tuple(waitingForGlobalCheckpoint, null)); @@ -140,7 +144,7 @@ public class GlobalCheckpointListeners implements Closeable { if (removed) { final TimeoutException e = new TimeoutException(timeout.getStringRep()); logger.trace("global checkpoint listener timed out", e); - executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e)); + notifyListener(listener, UNASSIGNED_SEQ_NO, e); } }, timeout.nanos(), @@ -193,7 +197,6 @@ public class GlobalCheckpointListeners implements Closeable { private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { assert Thread.holdsLock(this) : Thread.currentThread(); - assertNotification(globalCheckpoint, e); // early return if there are no listeners if (listeners.isEmpty()) { @@ -214,37 +217,38 @@ public class GlobalCheckpointListeners implements Closeable { listeners.clear(); } if (listenersToNotify.isEmpty() == false) { - executor.execute(() -> - listenersToNotify - .forEach((listener, t) -> { - /* - * We do not want to interrupt any timeouts that fired, these will detect that the listener has been - * notified and not trigger the timeout. - */ - FutureUtils.cancel(t.v2()); - notifyListener(listener, globalCheckpoint, e); - })); + listenersToNotify + .forEach((listener, t) -> { + /* + * We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and not + * trigger the timeout. + */ + FutureUtils.cancel(t.v2()); + notifyListener(listener, globalCheckpoint, e); + }); } } private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) { assertNotification(globalCheckpoint, e); - try { - listener.accept(globalCheckpoint, e); - } catch (final Exception caught) { - if (globalCheckpoint != UNASSIGNED_SEQ_NO) { - logger.warn( + listener.executor().execute(() -> { + try { + listener.accept(globalCheckpoint, e); + } catch (final Exception caught) { + if (globalCheckpoint != UNASSIGNED_SEQ_NO) { + logger.warn( new ParameterizedMessage( - "error notifying global checkpoint listener of updated global checkpoint [{}]", - globalCheckpoint), + "error notifying global checkpoint listener of updated global checkpoint [{}]", + globalCheckpoint), caught); - } else if (e instanceof IndexShardClosedException) { - logger.warn("error notifying global checkpoint listener of closed shard", caught); - } else { - logger.warn("error notifying global checkpoint listener of timeout", caught); + } else if (e instanceof IndexShardClosedException) { + logger.warn("error notifying global checkpoint listener of closed shard", caught); + } else { + logger.warn("error notifying global checkpoint listener of timeout", caught); + } } - } + }); } private void assertNotification(final long globalCheckpoint, final Exception e) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index db0b3651d79..a207653b6db 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -335,7 +335,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); this.pendingPrimaryTerm = primaryTerm; this.globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger); + new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger); this.replicationTracker = new ReplicationTracker( shardId, aId, diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersIT.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersIT.java new file mode 100644 index 00000000000..fb77f327bce --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersIT.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.After; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; + +public class GlobalCheckpointListenersIT extends ESSingleNodeTestCase { + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @After + public void shutdownExecutor() { + executor.shutdown(); + } + + public void testGlobalCheckpointListeners() throws Exception { + createIndex("test", Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0).build()); + ensureGreen(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + final int numberOfUpdates = randomIntBetween(1, 128); + for (int i = 0; i < numberOfUpdates; i++) { + final int index = i; + final AtomicLong globalCheckpoint = new AtomicLong(); + shard.addGlobalCheckpointListener( + i, + new GlobalCheckpointListeners.GlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); + assertNull(e); + globalCheckpoint.set(g); + } + + }, + null); + client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index))); + // adding a listener expecting a lower global checkpoint should fire immediately + final AtomicLong immediateGlobalCheckpint = new AtomicLong(); + shard.addGlobalCheckpointListener( + randomLongBetween(0, i), + new GlobalCheckpointListeners.GlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); + assertNull(e); + immediateGlobalCheckpint.set(g); + } + + }, + null); + assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index))); + } + final AtomicBoolean invoked = new AtomicBoolean(); + shard.addGlobalCheckpointListener( + numberOfUpdates, + new GlobalCheckpointListeners.GlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + invoked.set(true); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertThat(e, instanceOf(IndexShardClosedException.class)); + assertThat(((IndexShardClosedException)e).getShardId(), equalTo(shard.shardId())); + } + + }, + null); + shard.close("closed", randomBoolean()); + assertBusy(() -> assertTrue(invoked.get())); + } + + public void testGlobalCheckpointListenerTimeout() throws InterruptedException { + createIndex("test", Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0).build()); + ensureGreen(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + final AtomicBoolean notified = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); + shard.addGlobalCheckpointListener( + 0, + new GlobalCheckpointListeners.GlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + try { + notified.set(true); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertNotNull(e); + assertThat(e, instanceOf(TimeoutException.class)); + assertThat(e.getMessage(), equalTo(timeout.getStringRep())); + } finally { + latch.countDown(); + } + } + + }, + timeout); + latch.await(); + assertTrue(notified.get()); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index d71bade29a3..7d09861466f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -67,6 +67,15 @@ import static org.mockito.Mockito.verify; public class GlobalCheckpointListenersTests extends ESTestCase { + @FunctionalInterface + interface TestGlobalCheckpointListener extends GlobalCheckpointListeners.GlobalCheckpointListener { + + default Executor executor() { + return Runnable::run; + } + + } + private final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); private final ScheduledThreadPoolExecutor scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler")); @@ -78,17 +87,19 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testGlobalCheckpointUpdated() throws IOException { final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); + new GlobalCheckpointListeners(shardId, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 64); final Map listeners = new HashMap<>(); final Map notifiedListeners = new HashMap<>(); for (int i = 0; i < numberOfListeners; i++) { - final GlobalCheckpointListeners.GlobalCheckpointListener listener = new GlobalCheckpointListeners.GlobalCheckpointListener() { + final TestGlobalCheckpointListener listener = new TestGlobalCheckpointListener() { + @Override public void accept(final long g, final Exception e) { notifiedListeners.put(this, g); } + }; final long waitingGlobalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); listeners.put(listener, waitingGlobalCheckpoint); @@ -133,7 +144,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testListenersReadyToBeNotified() throws IOException { final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); + new GlobalCheckpointListeners(shardId, scheduler, logger); final long globalCheckpoint = randomLongBetween(0, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); @@ -165,7 +176,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testFailingListenerReadyToBeNotified() { final Logger mockLogger = mock(Logger.class); final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); + new GlobalCheckpointListeners(shardId, scheduler, mockLogger); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); @@ -207,7 +218,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testClose() throws IOException { final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); + new GlobalCheckpointListeners(shardId, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final Exception[] exceptions = new Exception[numberOfListeners]; @@ -235,7 +246,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testAddAfterClose() throws InterruptedException, IOException { final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); + new GlobalCheckpointListeners(shardId, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); globalCheckpointListeners.close(); final AtomicBoolean invoked = new AtomicBoolean(); @@ -254,7 +265,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testFailingListenerOnUpdate() { final Logger mockLogger = mock(Logger.class); final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); + new GlobalCheckpointListeners(shardId, scheduler, mockLogger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; @@ -308,7 +319,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testFailingListenerOnClose() throws IOException { final Logger mockLogger = mock(Logger.class); final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); + new GlobalCheckpointListeners(shardId, scheduler, mockLogger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; @@ -360,24 +371,35 @@ public class GlobalCheckpointListenersTests extends ESTestCase { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); final AtomicInteger notified = new AtomicInteger(); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( - 0, - maybeMultipleInvocationProtectingListener((g, e) -> { - notified.incrementAndGet(); - assertThat(g, equalTo(globalCheckpoint)); - assertNull(e); + 0, + maybeMultipleInvocationProtectingListener( + new TestGlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + notified.incrementAndGet(); + assertThat(g, equalTo(globalCheckpoint)); + assertNull(e); + } + }), - null); + null); } globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); assertThat(notified.get(), equalTo(numberOfListeners)); - assertThat(count.get(), equalTo(numberOfListeners == 0 ? 0 : 1)); + assertThat(count.get(), equalTo(numberOfListeners)); } public void testNotificationOnClosedUsesExecutor() throws IOException { @@ -386,21 +408,32 @@ public class GlobalCheckpointListenersTests extends ESTestCase { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger); globalCheckpointListeners.close(); final AtomicInteger notified = new AtomicInteger(); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( - NO_OPS_PERFORMED, - maybeMultipleInvocationProtectingListener((g, e) -> { - notified.incrementAndGet(); - assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); - assertNotNull(e); - assertThat(e, instanceOf(IndexShardClosedException.class)); - assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId)); + NO_OPS_PERFORMED, + maybeMultipleInvocationProtectingListener( + new TestGlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + notified.incrementAndGet(); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertNotNull(e); + assertThat(e, instanceOf(IndexShardClosedException.class)); + assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId)); + } + }), - null); + null); } assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners)); @@ -412,20 +445,30 @@ public class GlobalCheckpointListenersTests extends ESTestCase { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger); final long globalCheckpoint = randomNonNegativeLong(); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final AtomicInteger notified = new AtomicInteger(); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { globalCheckpointListeners.add( - randomLongBetween(0, globalCheckpoint), - maybeMultipleInvocationProtectingListener((g, e) -> { - notified.incrementAndGet(); - assertThat(g, equalTo(globalCheckpoint)); - assertNull(e); + randomLongBetween(0, globalCheckpoint), + maybeMultipleInvocationProtectingListener( + new TestGlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + notified.incrementAndGet(); + assertThat(g, equalTo(globalCheckpoint)); + assertNull(e); + } }), - null); + null); } assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners)); @@ -433,7 +476,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testConcurrency() throws Exception { final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger); final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); // we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread @@ -469,14 +512,24 @@ public class GlobalCheckpointListenersTests extends ESTestCase { invocations.add(invocation); // sometimes this will notify the listener immediately globalCheckpointListeners.add( - globalCheckpoint.get(), - maybeMultipleInvocationProtectingListener( - (g, e) -> { - if (invocation.compareAndSet(false, true) == false) { - throw new IllegalStateException("listener invoked twice"); - } - }), - randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); + globalCheckpoint.get(), + maybeMultipleInvocationProtectingListener( + new TestGlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; + } + + @Override + public void accept(final long g, final Exception e) { + if (invocation.compareAndSet(false, true) == false) { + throw new IllegalStateException("listener invoked twice"); + } + } + + }), + randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); } // synchronize ending with the updating thread and the main test thread awaitQuietly(barrier); @@ -506,7 +559,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testTimeout() throws InterruptedException { final Logger mockLogger = mock(Logger.class); final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); + new GlobalCheckpointListeners(shardId, scheduler, mockLogger); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); final AtomicBoolean notified = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); @@ -541,22 +594,33 @@ public class GlobalCheckpointListenersTests extends ESTestCase { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); final AtomicBoolean notified = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); globalCheckpointListeners.add( - NO_OPS_PERFORMED, - maybeMultipleInvocationProtectingListener((g, e) -> { - try { - notified.set(true); - assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); - assertThat(e, instanceOf(TimeoutException.class)); - } finally { - latch.countDown(); + NO_OPS_PERFORMED, + maybeMultipleInvocationProtectingListener( + new TestGlobalCheckpointListener() { + + @Override + public Executor executor() { + return executor; } + + @Override + public void accept(final long g, final Exception e) { + try { + notified.set(true); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertThat(e, instanceOf(TimeoutException.class)); + } finally { + latch.countDown(); + } + } + }), - timeout); + timeout); latch.await(); // ensure the listener notification occurred on the executor assertTrue(notified.get()); @@ -571,7 +635,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { return null; }).when(mockLogger).warn(argThat(any(String.class)), argThat(any(RuntimeException.class))); final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); + new GlobalCheckpointListeners(shardId, scheduler, mockLogger); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); globalCheckpointListeners.add( NO_OPS_PERFORMED, @@ -591,7 +655,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { public void testTimeoutCancelledAfterListenerNotified() { final GlobalCheckpointListeners globalCheckpointListeners = - new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); + new GlobalCheckpointListeners(shardId, scheduler, logger); final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE); final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = maybeMultipleInvocationProtectingListener((g, e) -> { @@ -606,14 +670,24 @@ public class GlobalCheckpointListenersTests extends ESTestCase { } private GlobalCheckpointListeners.GlobalCheckpointListener maybeMultipleInvocationProtectingListener( - final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener) { + final TestGlobalCheckpointListener globalCheckpointListener) { if (Assertions.ENABLED) { final AtomicBoolean invoked = new AtomicBoolean(); - return (g, e) -> { - if (invoked.compareAndSet(false, true) == false) { - throw new AssertionError("listener invoked twice"); + return new GlobalCheckpointListeners.GlobalCheckpointListener() { + + @Override + public Executor executor() { + return globalCheckpointListener.executor(); } - globalCheckpointListener.accept(g, e); + + @Override + public void accept(final long g, final Exception e) { + if (invoked.compareAndSet(false, true) == false) { + throw new AssertionError("listener invoked twice"); + } + globalCheckpointListener.accept(g, e); + } + }; } else { return globalCheckpointListener; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 7a7cc29aadd..29b27a7078b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -101,9 +101,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Stream; @@ -116,8 +114,6 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; -import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -128,7 +124,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -688,83 +683,6 @@ public class IndexShardIT extends ESSingleNodeTestCase { return shardRouting; } - public void testGlobalCheckpointListeners() throws Exception { - createIndex("test", Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0).build()); - ensureGreen(); - final IndicesService indicesService = getInstanceFromNode(IndicesService.class); - final IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - final int numberOfUpdates = randomIntBetween(1, 128); - for (int i = 0; i < numberOfUpdates; i++) { - final int index = i; - final AtomicLong globalCheckpoint = new AtomicLong(); - shard.addGlobalCheckpointListener( - i, - (g, e) -> { - assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); - assertNull(e); - globalCheckpoint.set(g); - }, - null); - client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index))); - // adding a listener expecting a lower global checkpoint should fire immediately - final AtomicLong immediateGlobalCheckpint = new AtomicLong(); - shard.addGlobalCheckpointListener( - randomLongBetween(0, i), - (g, e) -> { - assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); - assertNull(e); - immediateGlobalCheckpint.set(g); - }, - null); - assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index))); - } - final AtomicBoolean invoked = new AtomicBoolean(); - shard.addGlobalCheckpointListener( - numberOfUpdates, - (g, e) -> { - invoked.set(true); - assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); - assertThat(e, instanceOf(IndexShardClosedException.class)); - assertThat(((IndexShardClosedException)e).getShardId(), equalTo(shard.shardId())); - }, - null); - shard.close("closed", randomBoolean()); - assertBusy(() -> assertTrue(invoked.get())); - } - - public void testGlobalCheckpointListenerTimeout() throws InterruptedException { - createIndex("test", Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0).build()); - ensureGreen(); - final IndicesService indicesService = getInstanceFromNode(IndicesService.class); - final IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - final AtomicBoolean notified = new AtomicBoolean(); - final CountDownLatch latch = new CountDownLatch(1); - final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); - shard.addGlobalCheckpointListener( - 0, - (g, e) -> { - try { - notified.set(true); - assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); - assertNotNull(e); - assertThat(e, instanceOf(TimeoutException.class)); - assertThat(e.getMessage(), equalTo(timeout.getStringRep())); - } finally { - latch.countDown(); - } - }, - timeout); - latch.await(); - assertTrue(notified.get()); - } - public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Exception { createIndex("test", Settings.builder() .put("index.number_of_shards", 1) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 24ebd05a2d1..d1270aeefd2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -8,10 +8,10 @@ package org.elasticsearch.xpack.ccr.action; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; @@ -32,6 +32,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.MissingHistoryOperationsException; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.shard.GlobalCheckpointListeners; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotStartedException; import org.elasticsearch.index.shard.IndexShardState; @@ -48,6 +49,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -388,18 +390,28 @@ public class ShardChangesAction extends ActionType seqNoStats.getGlobalCheckpoint(), request.getFromSeqNo()); indexShard.addGlobalCheckpointListener( - request.getFromSeqNo(), - (g, e) -> { + request.getFromSeqNo(), + new GlobalCheckpointListeners.GlobalCheckpointListener() { + + @Override + public Executor executor() { + return threadPool.executor(ThreadPool.Names.LISTENER); + } + + @Override + public void accept(final long g, final Exception e) { if (g != UNASSIGNED_SEQ_NO) { assert request.getFromSeqNo() <= g - : shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]"; + : shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]"; globalCheckpointAdvanced(shardId, g, request, listener); } else { assert e != null; globalCheckpointAdvancementFailure(shardId, e, request, listener, indexShard); } - }, - request.getPollTimeout()); + } + + }, + request.getPollTimeout()); } else { super.asyncShardOperation(request, shardId, listener); }