From 162ad1251cca0e26121d3db1a989a2bb6ba50749 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 27 Aug 2016 21:42:38 +0200 Subject: [PATCH] Fsync documents in an async fashion (#20145) today we fsync in a blocking fashion where all threads block while another syncs. Yet, we can improve this and make use of the async infrastrucutre added for `wait_for_refresh` and make fsyncing single threaded while all other threads can continue indexing. The syncing thread then notifies a listener once the requests location is synced. This also allows to send docs to replicas before its actually fsynced allowing for cocurrent replica processing. This patch has a significant impact on performance on slower discs. An initial single node benchmark shows that on very fast SSDs there is no noticable impact but on slow spinning disk this patch shows a ~32% performance improvement. ``` NVME SSD: 336ec0ac9a12b967163a4a21f75beb41c8582cde (master): Total docs/sec: 47200.9 Total docs/sec: 46440.4 23543a97e3e7f72a31e26b50e00931919784426c (async wait for translog): Total docs/sec: 47461.6 Total docs/sec: 46188.3 ------------------------------------------------------------------- Spinning disk: 336ec0ac9a12b967163a4a21f75beb41c8582cde (master): Total docs/sec: 22733.0 Total docs/sec: 24129.8 23543a97e3e7f72a31e26b50e00931919784426c (async wait for translog): Total docs/sec: 32724.1 Total docs/sec: 32845.4 -------------------------------------------------------------------- ``` --- .../replication/TransportWriteAction.java | 176 +++++++++++++----- .../util/concurrent/AsyncIOProcessor.java | 128 +++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 42 +++-- .../index/translog/Translog.java | 21 ++- .../concurrent/AsyncIOProcessorTests.java | 160 ++++++++++++++++ .../index/shard/IndexShardTests.java | 37 ++++ .../index/translog/TranslogTests.java | 32 ++++ 7 files changed, 539 insertions(+), 57 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java create mode 100644 core/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 9261bea945c..c472c7454ab 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -39,6 +39,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; /** @@ -128,29 +131,38 @@ public abstract class TransportWriteAction< * We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the * refresh in parallel on the primary and on the replica. */ - postWriteActions(indexShard, request, location, this, logger); + new AsyncAfterWriteAction(indexShard, request, location, this, logger).run(); } @Override public synchronized void respond(ActionListener listener) { this.listener = listener; - respondIfPossible(); + respondIfPossible(null); } /** * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. */ - protected void respondIfPossible() { + protected void respondIfPossible(Exception ex) { if (finishedAsyncActions && listener != null) { - super.respond(listener); + if (ex == null) { + super.respond(listener); + } else { + listener.onFailure(ex); + } } } + public synchronized void onFailure(Exception exception) { + finishedAsyncActions = true; + respondIfPossible(exception); + } + @Override - public synchronized void respondAfterAsyncAction(boolean forcedRefresh) { + public synchronized void onSuccess(boolean forcedRefresh) { finalResponse.setForcedRefresh(forcedRefresh); finishedAsyncActions = true; - respondIfPossible(); + respondIfPossible(null); } } @@ -162,68 +174,144 @@ public abstract class TransportWriteAction< private ActionListener listener; public WriteReplicaResult(IndexShard indexShard, ReplicatedWriteRequest request, Translog.Location location) { - postWriteActions(indexShard, request, location, this, logger); + new AsyncAfterWriteAction(indexShard, request, location, this, logger).run(); } @Override public void respond(ActionListener listener) { this.listener = listener; - respondIfPossible(); + respondIfPossible(null); } /** * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. */ - protected void respondIfPossible() { + protected void respondIfPossible(Exception ex) { if (finishedAsyncActions && listener != null) { - super.respond(listener); + if (ex == null) { + super.respond(listener); + } else { + listener.onFailure(ex); + } } } @Override - public synchronized void respondAfterAsyncAction(boolean forcedRefresh) { + public void onFailure(Exception ex) { finishedAsyncActions = true; - respondIfPossible(); + respondIfPossible(ex); + } + + @Override + public synchronized void onSuccess(boolean forcedRefresh) { + finishedAsyncActions = true; + respondIfPossible(null); } } + /** + * callback used by {@link AsyncAfterWriteAction} to notify that all post + * process actions have been executed + */ private interface RespondingWriteResult { - void respondAfterAsyncAction(boolean forcedRefresh); + /** + * Called on successful processing of all post write actions + * @param forcedRefresh true iff this write has caused a refresh + */ + void onSuccess(boolean forcedRefresh); + + /** + * Called on failure if a post action failed. + */ + void onFailure(Exception ex); } - static void postWriteActions(final IndexShard indexShard, - final WriteRequest request, - @Nullable final Translog.Location location, - final RespondingWriteResult respond, - final ESLogger logger) { - boolean pendingOps = false; - boolean immediateRefresh = false; - switch (request.getRefreshPolicy()) { - case IMMEDIATE: - indexShard.refresh("refresh_flag_index"); - immediateRefresh = true; - break; - case WAIT_UNTIL: - if (location != null) { - pendingOps = true; - indexShard.addRefreshListener(location, forcedRefresh -> { - if (forcedRefresh) { - logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request); - } - respond.respondAfterAsyncAction(forcedRefresh); - }); + /** + * This class encapsulates post write actions like async waits for + * translog syncs or waiting for a refresh to happen making the write operation + * visible. + */ + static final class AsyncAfterWriteAction { + private final Location location; + private final boolean waitUntilRefresh; + private final boolean sync; + private final AtomicInteger pendingOps = new AtomicInteger(1); + private final AtomicBoolean refreshed = new AtomicBoolean(false); + private final AtomicReference syncFailure = new AtomicReference<>(null); + private final RespondingWriteResult respond; + private final IndexShard indexShard; + private final WriteRequest request; + private final ESLogger logger; + + AsyncAfterWriteAction(final IndexShard indexShard, + final WriteRequest request, + @Nullable final Translog.Location location, + final RespondingWriteResult respond, + final ESLogger logger) { + this.indexShard = indexShard; + this.request = request; + boolean waitUntilRefresh = false; + switch (request.getRefreshPolicy()) { + case IMMEDIATE: + indexShard.refresh("refresh_flag_index"); + refreshed.set(true); + break; + case WAIT_UNTIL: + if (location != null) { + waitUntilRefresh = true; + pendingOps.incrementAndGet(); + } + break; + case NONE: + break; + default: + throw new IllegalArgumentException("unknown refresh policy: " + request.getRefreshPolicy()); + } + this.waitUntilRefresh = waitUntilRefresh; + this.respond = respond; + this.location = location; + if ((sync = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null)) { + pendingOps.incrementAndGet(); + } + this.logger = logger; + assert pendingOps.get() >= 0 && pendingOps.get() <= 3 : "pendingOpts was: " + pendingOps.get(); + } + + /** calls the response listener if all pending operations have returned otherwise it just decrements the pending opts counter.*/ + private void maybeFinish() { + final int numPending = pendingOps.decrementAndGet(); + if (numPending == 0) { + if (syncFailure.get() != null) { + respond.onFailure(syncFailure.get()); + } else { + respond.onSuccess(refreshed.get()); } - break; - case NONE: - break; + } + assert numPending >= 0 && numPending <= 2: "numPending must either 2, 1 or 0 but was " + numPending ; } - boolean fsyncTranslog = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null; - if (fsyncTranslog) { - indexShard.sync(location); - } - indexShard.maybeFlush(); - if (pendingOps == false) { - respond.respondAfterAsyncAction(immediateRefresh); + + void run() { + // we either respond immediately ie. if we we don't fsync per request or wait for refresh + // OR we got an pass async operations on and wait for them to return to respond. + indexShard.maybeFlush(); + maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success. + if (waitUntilRefresh) { + assert pendingOps.get() > 0; + indexShard.addRefreshListener(location, forcedRefresh -> { + if (forcedRefresh) { + logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request); + } + refreshed.set(forcedRefresh); + maybeFinish(); + }); + } + if (sync) { + assert pendingOps.get() > 0; + indexShard.sync(location, (ex) -> { + syncFailure.set(ex); + maybeFinish(); + }); + } } } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java new file mode 100644 index 00000000000..d201cf94f93 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.ESLogger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; + +/** + * This async IO processor allows to batch IO operations and have a single writer processing the write operations. + * This can be used to ensure that threads can continue with other work while the actual IO operation is still processed + * by a single worker. A worker in this context can be any caller of the {@link #put(Object, Consumer)} method since it will + * hijack a worker if nobody else is currently processing queued items. If the internal queue has reached it's capacity incoming threads + * might be blocked until other items are processed + */ +public abstract class AsyncIOProcessor { + private final ESLogger logger; + private final ArrayBlockingQueue>> queue; + private final Semaphore promiseSemaphore = new Semaphore(1); + + protected AsyncIOProcessor(ESLogger logger, int queueSize) { + this.logger = logger; + this.queue = new ArrayBlockingQueue<>(queueSize); + } + + /** + * Adds the given item to the queue. The listener is notified once the item is processed + */ + public final void put(Item item, Consumer listener) { + Objects.requireNonNull(item, "item must not be null"); + Objects.requireNonNull(listener, "listener must not be null"); + // the algorithm here tires to reduce the load on each individual caller. + // we try to have only one caller that processes pending items to disc while others just add to the queue but + // at the same time never overload the node by pushing too many items into the queue. + + // we first try make a promise that we are responsible for the processing + final boolean promised = promiseSemaphore.tryAcquire(); + final Tuple> itemTuple = new Tuple<>(item, listener); + if (promised == false) { + // in this case we are not responsible and can just block until there is space + try { + queue.put(new Tuple<>(item, listener)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.accept(e); + } + } + + // here we have to try to make the promise again otherwise there is a race when a thread puts an entry without making the promise + // while we are draining that mean we might exit below too early in the while loop if the drainAndSync call is fast. + if (promised || promiseSemaphore.tryAcquire()) { + final List>> candidates = new ArrayList<>(); + try { + if (promised) { + // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates + candidates.add(itemTuple); + } + // since we made the promise to process we gotta do it here at least once + drainAndProcess(candidates); + } finally { + promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over + } + while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { + // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing + try { + drainAndProcess(candidates); + } finally { + promiseSemaphore.release(); + } + } + } + } + + private void drainAndProcess(List>> candidates) { + queue.drainTo(candidates); + processList(candidates); + candidates.clear(); + } + + private void processList(List>> candidates) { + Exception exception = null; + if (candidates.isEmpty() == false) { + try { + write(candidates); + } catch (Exception ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to write candidates", ex); + // this exception is passed to all listeners - we don't retry. if this doesn't work we are in deep shit + exception = ex; + } + } + for (Tuple> tuple : candidates) { + Consumer consumer = tuple.v2(); + try { + consumer.accept(exception); + } catch (Exception ex) { + logger.warn("failed to notify callback", ex); + } + } + } + + /** + * Writes or processes the items out or to disk. + */ + protected abstract void write(List>> candidates) throws IOException; +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4e4b8a3e27d..282209daa40 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -49,6 +49,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; @@ -62,6 +63,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexNotFoundException; @@ -136,6 +138,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1643,19 +1646,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close } - /** - * Syncs the given location with the underlying storage unless already synced. - */ - public void sync(Translog.Location location) { - try { - final Engine engine = getEngine(); - engine.getTranslog().ensureSynced(location); - } catch (EngineClosedException ex) { - // that's fine since we already synced everything on engine close - this also is conform with the methods documentation - } catch (IOException ex) { // if this fails we are in deep shit - fail the request - logger.debug("failed to sync translog", ex); - throw new ElasticsearchException("failed to sync translog", ex); + private final AsyncIOProcessor translogSyncProcessor = new AsyncIOProcessor(logger, 1024) { + @Override + protected void write(List>> candidates) throws IOException { + try { + final Engine engine = getEngine(); + engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1)); + } catch (EngineClosedException ex) { + // that's fine since we already synced everything on engine close - this also is conform with the methods + // documentation + } catch (IOException ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to sync translog", ex); + throw ex; + } } + }; + + /** + * Syncs the given location with the underlying storage unless already synced. This method might return immediately without + * actually fsyncing the location until the sync listener is called. Yet, unless there is already another thread fsyncing + * the transaction log the caller thread will be hijacked to run the fsync for all pending fsync operations. + * This method allows indexing threads to continue indexing without blocking on fsync calls. We ensure that there is only + * one thread blocking on the sync an all others can continue indexing. + * NOTE: if the syncListener throws an exception when it's processed the exception will only be logged. Users should make sure that the + * listener handles all exception cases internally. + */ + public final void sync(Translog.Location location, Consumer syncListener) { + verifyNotClosed(); + translogSyncProcessor.put(location, syncListener); } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 7afcb8a558a..68a1dd1aa36 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -54,6 +54,7 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -562,6 +563,24 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return false; } + /** + * Ensures that all locations in the given stream have been synced / written to the underlying storage. + * This method allows for internal optimization to minimize the amout of fsync operations if multiple + * locations must be synced. + * + * @return Returns true iff this call caused an actual sync operation otherwise false + */ + public boolean ensureSynced(Stream locations) throws IOException { + final Optional max = locations.max(Location::compareTo); + // we only need to sync the max location since it will sync all other + // locations implicitly + if (max.isPresent()) { + return ensureSynced(max.get()); + } else { + return false; + } + } + private void closeOnTragicEvent(Exception ex) { if (current.getTragicException() != null) { try { @@ -765,7 +784,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** * Reads the type and the operation from the given stream. The operatino must be written with - * {@link #writeType(Operation, StreamOutput)} + * {@link Operation#writeType(Operation, StreamOutput)} */ static Operation readType(StreamInput input) throws IOException { Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java new file mode 100644 index 00000000000..642737b0744 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java @@ -0,0 +1,160 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class AsyncIOProcessorTests extends ESTestCase { + + public void testPut() throws InterruptedException { + boolean blockInternal = randomBoolean(); + AtomicInteger received = new AtomicInteger(0); + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + if (blockInternal) { + synchronized (this) { + for (Tuple> c :candidates) { + received.incrementAndGet(); + } + } + } else { + received.addAndGet(candidates.size()); + } + } + }; + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + final int count = randomIntBetween(1000, 20000); + Thread[] thread = new Thread[randomIntBetween(3, 10)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < count; i++) { + semaphore.acquire(); + processor.put(new Object(), (ex) -> semaphore.release()); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + semaphore.acquire(Integer.MAX_VALUE); + assertEquals(count * thread.length, received.get()); + } + + public void testRandomFail() throws InterruptedException { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger failed = new AtomicInteger(0); + AtomicInteger actualFailed = new AtomicInteger(0); + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + if (randomBoolean()) { + failed.addAndGet(candidates.size()); + if (randomBoolean()) { + throw new IOException(); + } else { + throw new RuntimeException(); + } + } + } + }; + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + final int count = randomIntBetween(1000, 20000); + Thread[] thread = new Thread[randomIntBetween(3, 10)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < count; i++) { + semaphore.acquire(); + processor.put(new Object(), (ex) -> { + if (ex != null) { + actualFailed.incrementAndGet(); + } + semaphore.release(); + }); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + semaphore.acquire(Integer.MAX_VALUE); + assertEquals(count * thread.length, received.get()); + assertEquals(actualFailed.get(), failed.get()); + } + + public void testConsumerCanThrowExceptions() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + }; + processor.put(new Object(), (e) -> {notified.incrementAndGet();throw new RuntimeException();}); + processor.put(new Object(), (e) -> {notified.incrementAndGet();throw new RuntimeException();}); + assertEquals(2, notified.get()); + assertEquals(2, received.get()); + } + + public void testNullArguments() { + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + } + }; + + expectThrows(NullPointerException.class, () -> processor.put(null, (e) -> {})); + expectThrows(NullPointerException.class, () -> processor.put(new Object(), null)); + } +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 061a978b019..2cf121eb9dd 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -100,6 +100,7 @@ import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; @@ -135,6 +136,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -581,6 +583,41 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertTrue(shard.getEngine().getTranslog().syncNeeded()); } + public void testAsyncFsync() throws InterruptedException { + createIndex("test"); + ensureGreen(); + client().prepareIndex("test", "bar", "1").setSource("{}").get(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + IndexShard shard = test.getShardOrNull(0); + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + Thread[] thread = new Thread[randomIntBetween(3, 5)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < 10000; i++) { + semaphore.acquire(); + shard.sync(TranslogTests.randomTranslogLocation(), (ex) -> semaphore.release()); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + semaphore.acquire(Integer.MAX_VALUE); + } + private void setDurability(IndexShard shard, Translog.Durability durability) { client().admin().indices().prepareUpdateSettings(shard.shardId.getIndexName()).setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability.name()).build()).get(); assertEquals(durability, shard.getTranslogDurability()); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 54723807019..08b4d8ac71e 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -810,6 +810,38 @@ public class TranslogTests extends ESTestCase { } } + public void testSyncUpToStream() throws IOException { + int iters = randomIntBetween(5, 10); + for (int i = 0; i < iters; i++) { + int translogOperations = randomIntBetween(10, 100); + int count = 0; + ArrayList locations = new ArrayList<>(); + for (int op = 0; op < translogOperations; op++) { + if (rarely()) { + translog.commit(); // do this first so that there is at least one pending tlog entry + } + final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); + locations.add(location); + } + Collections.shuffle(locations, random()); + if (randomBoolean()) { + assertTrue("at least one operation pending", translog.syncNeeded()); + assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); + assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced + } else if (rarely()) { + translog.commit(); + assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now + assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); + } else { + translog.sync(); + assertFalse("translog has been synced already", translog.ensureSynced(locations.stream())); + } + for (Location location : locations) { + assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); + } + } + } + public void testLocationComparison() throws IOException { List locations = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100);