diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 9261bea945c..c472c7454ab 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -39,6 +39,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; /** @@ -128,29 +131,38 @@ public abstract class TransportWriteAction< * We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the * refresh in parallel on the primary and on the replica. */ - postWriteActions(indexShard, request, location, this, logger); + new AsyncAfterWriteAction(indexShard, request, location, this, logger).run(); } @Override public synchronized void respond(ActionListener listener) { this.listener = listener; - respondIfPossible(); + respondIfPossible(null); } /** * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. */ - protected void respondIfPossible() { + protected void respondIfPossible(Exception ex) { if (finishedAsyncActions && listener != null) { - super.respond(listener); + if (ex == null) { + super.respond(listener); + } else { + listener.onFailure(ex); + } } } + public synchronized void onFailure(Exception exception) { + finishedAsyncActions = true; + respondIfPossible(exception); + } + @Override - public synchronized void respondAfterAsyncAction(boolean forcedRefresh) { + public synchronized void onSuccess(boolean forcedRefresh) { finalResponse.setForcedRefresh(forcedRefresh); finishedAsyncActions = true; - respondIfPossible(); + respondIfPossible(null); } } @@ -162,68 +174,144 @@ public abstract class TransportWriteAction< private ActionListener listener; public WriteReplicaResult(IndexShard indexShard, ReplicatedWriteRequest request, Translog.Location location) { - postWriteActions(indexShard, request, location, this, logger); + new AsyncAfterWriteAction(indexShard, request, location, this, logger).run(); } @Override public void respond(ActionListener listener) { this.listener = listener; - respondIfPossible(); + respondIfPossible(null); } /** * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. */ - protected void respondIfPossible() { + protected void respondIfPossible(Exception ex) { if (finishedAsyncActions && listener != null) { - super.respond(listener); + if (ex == null) { + super.respond(listener); + } else { + listener.onFailure(ex); + } } } @Override - public synchronized void respondAfterAsyncAction(boolean forcedRefresh) { + public void onFailure(Exception ex) { finishedAsyncActions = true; - respondIfPossible(); + respondIfPossible(ex); + } + + @Override + public synchronized void onSuccess(boolean forcedRefresh) { + finishedAsyncActions = true; + respondIfPossible(null); } } + /** + * callback used by {@link AsyncAfterWriteAction} to notify that all post + * process actions have been executed + */ private interface RespondingWriteResult { - void respondAfterAsyncAction(boolean forcedRefresh); + /** + * Called on successful processing of all post write actions + * @param forcedRefresh true iff this write has caused a refresh + */ + void onSuccess(boolean forcedRefresh); + + /** + * Called on failure if a post action failed. + */ + void onFailure(Exception ex); } - static void postWriteActions(final IndexShard indexShard, - final WriteRequest request, - @Nullable final Translog.Location location, - final RespondingWriteResult respond, - final ESLogger logger) { - boolean pendingOps = false; - boolean immediateRefresh = false; - switch (request.getRefreshPolicy()) { - case IMMEDIATE: - indexShard.refresh("refresh_flag_index"); - immediateRefresh = true; - break; - case WAIT_UNTIL: - if (location != null) { - pendingOps = true; - indexShard.addRefreshListener(location, forcedRefresh -> { - if (forcedRefresh) { - logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request); - } - respond.respondAfterAsyncAction(forcedRefresh); - }); + /** + * This class encapsulates post write actions like async waits for + * translog syncs or waiting for a refresh to happen making the write operation + * visible. + */ + static final class AsyncAfterWriteAction { + private final Location location; + private final boolean waitUntilRefresh; + private final boolean sync; + private final AtomicInteger pendingOps = new AtomicInteger(1); + private final AtomicBoolean refreshed = new AtomicBoolean(false); + private final AtomicReference syncFailure = new AtomicReference<>(null); + private final RespondingWriteResult respond; + private final IndexShard indexShard; + private final WriteRequest request; + private final ESLogger logger; + + AsyncAfterWriteAction(final IndexShard indexShard, + final WriteRequest request, + @Nullable final Translog.Location location, + final RespondingWriteResult respond, + final ESLogger logger) { + this.indexShard = indexShard; + this.request = request; + boolean waitUntilRefresh = false; + switch (request.getRefreshPolicy()) { + case IMMEDIATE: + indexShard.refresh("refresh_flag_index"); + refreshed.set(true); + break; + case WAIT_UNTIL: + if (location != null) { + waitUntilRefresh = true; + pendingOps.incrementAndGet(); + } + break; + case NONE: + break; + default: + throw new IllegalArgumentException("unknown refresh policy: " + request.getRefreshPolicy()); + } + this.waitUntilRefresh = waitUntilRefresh; + this.respond = respond; + this.location = location; + if ((sync = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null)) { + pendingOps.incrementAndGet(); + } + this.logger = logger; + assert pendingOps.get() >= 0 && pendingOps.get() <= 3 : "pendingOpts was: " + pendingOps.get(); + } + + /** calls the response listener if all pending operations have returned otherwise it just decrements the pending opts counter.*/ + private void maybeFinish() { + final int numPending = pendingOps.decrementAndGet(); + if (numPending == 0) { + if (syncFailure.get() != null) { + respond.onFailure(syncFailure.get()); + } else { + respond.onSuccess(refreshed.get()); } - break; - case NONE: - break; + } + assert numPending >= 0 && numPending <= 2: "numPending must either 2, 1 or 0 but was " + numPending ; } - boolean fsyncTranslog = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null; - if (fsyncTranslog) { - indexShard.sync(location); - } - indexShard.maybeFlush(); - if (pendingOps == false) { - respond.respondAfterAsyncAction(immediateRefresh); + + void run() { + // we either respond immediately ie. if we we don't fsync per request or wait for refresh + // OR we got an pass async operations on and wait for them to return to respond. + indexShard.maybeFlush(); + maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success. + if (waitUntilRefresh) { + assert pendingOps.get() > 0; + indexShard.addRefreshListener(location, forcedRefresh -> { + if (forcedRefresh) { + logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request); + } + refreshed.set(forcedRefresh); + maybeFinish(); + }); + } + if (sync) { + assert pendingOps.get() > 0; + indexShard.sync(location, (ex) -> { + syncFailure.set(ex); + maybeFinish(); + }); + } } } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java new file mode 100644 index 00000000000..d201cf94f93 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.ESLogger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; + +/** + * This async IO processor allows to batch IO operations and have a single writer processing the write operations. + * This can be used to ensure that threads can continue with other work while the actual IO operation is still processed + * by a single worker. A worker in this context can be any caller of the {@link #put(Object, Consumer)} method since it will + * hijack a worker if nobody else is currently processing queued items. If the internal queue has reached it's capacity incoming threads + * might be blocked until other items are processed + */ +public abstract class AsyncIOProcessor { + private final ESLogger logger; + private final ArrayBlockingQueue>> queue; + private final Semaphore promiseSemaphore = new Semaphore(1); + + protected AsyncIOProcessor(ESLogger logger, int queueSize) { + this.logger = logger; + this.queue = new ArrayBlockingQueue<>(queueSize); + } + + /** + * Adds the given item to the queue. The listener is notified once the item is processed + */ + public final void put(Item item, Consumer listener) { + Objects.requireNonNull(item, "item must not be null"); + Objects.requireNonNull(listener, "listener must not be null"); + // the algorithm here tires to reduce the load on each individual caller. + // we try to have only one caller that processes pending items to disc while others just add to the queue but + // at the same time never overload the node by pushing too many items into the queue. + + // we first try make a promise that we are responsible for the processing + final boolean promised = promiseSemaphore.tryAcquire(); + final Tuple> itemTuple = new Tuple<>(item, listener); + if (promised == false) { + // in this case we are not responsible and can just block until there is space + try { + queue.put(new Tuple<>(item, listener)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.accept(e); + } + } + + // here we have to try to make the promise again otherwise there is a race when a thread puts an entry without making the promise + // while we are draining that mean we might exit below too early in the while loop if the drainAndSync call is fast. + if (promised || promiseSemaphore.tryAcquire()) { + final List>> candidates = new ArrayList<>(); + try { + if (promised) { + // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates + candidates.add(itemTuple); + } + // since we made the promise to process we gotta do it here at least once + drainAndProcess(candidates); + } finally { + promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over + } + while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { + // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing + try { + drainAndProcess(candidates); + } finally { + promiseSemaphore.release(); + } + } + } + } + + private void drainAndProcess(List>> candidates) { + queue.drainTo(candidates); + processList(candidates); + candidates.clear(); + } + + private void processList(List>> candidates) { + Exception exception = null; + if (candidates.isEmpty() == false) { + try { + write(candidates); + } catch (Exception ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to write candidates", ex); + // this exception is passed to all listeners - we don't retry. if this doesn't work we are in deep shit + exception = ex; + } + } + for (Tuple> tuple : candidates) { + Consumer consumer = tuple.v2(); + try { + consumer.accept(exception); + } catch (Exception ex) { + logger.warn("failed to notify callback", ex); + } + } + } + + /** + * Writes or processes the items out or to disk. + */ + protected abstract void write(List>> candidates) throws IOException; +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4e4b8a3e27d..282209daa40 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -49,6 +49,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; @@ -62,6 +63,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexNotFoundException; @@ -136,6 +138,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1643,19 +1646,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close } - /** - * Syncs the given location with the underlying storage unless already synced. - */ - public void sync(Translog.Location location) { - try { - final Engine engine = getEngine(); - engine.getTranslog().ensureSynced(location); - } catch (EngineClosedException ex) { - // that's fine since we already synced everything on engine close - this also is conform with the methods documentation - } catch (IOException ex) { // if this fails we are in deep shit - fail the request - logger.debug("failed to sync translog", ex); - throw new ElasticsearchException("failed to sync translog", ex); + private final AsyncIOProcessor translogSyncProcessor = new AsyncIOProcessor(logger, 1024) { + @Override + protected void write(List>> candidates) throws IOException { + try { + final Engine engine = getEngine(); + engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1)); + } catch (EngineClosedException ex) { + // that's fine since we already synced everything on engine close - this also is conform with the methods + // documentation + } catch (IOException ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to sync translog", ex); + throw ex; + } } + }; + + /** + * Syncs the given location with the underlying storage unless already synced. This method might return immediately without + * actually fsyncing the location until the sync listener is called. Yet, unless there is already another thread fsyncing + * the transaction log the caller thread will be hijacked to run the fsync for all pending fsync operations. + * This method allows indexing threads to continue indexing without blocking on fsync calls. We ensure that there is only + * one thread blocking on the sync an all others can continue indexing. + * NOTE: if the syncListener throws an exception when it's processed the exception will only be logged. Users should make sure that the + * listener handles all exception cases internally. + */ + public final void sync(Translog.Location location, Consumer syncListener) { + verifyNotClosed(); + translogSyncProcessor.put(location, syncListener); } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 7afcb8a558a..68a1dd1aa36 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -54,6 +54,7 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -562,6 +563,24 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return false; } + /** + * Ensures that all locations in the given stream have been synced / written to the underlying storage. + * This method allows for internal optimization to minimize the amout of fsync operations if multiple + * locations must be synced. + * + * @return Returns true iff this call caused an actual sync operation otherwise false + */ + public boolean ensureSynced(Stream locations) throws IOException { + final Optional max = locations.max(Location::compareTo); + // we only need to sync the max location since it will sync all other + // locations implicitly + if (max.isPresent()) { + return ensureSynced(max.get()); + } else { + return false; + } + } + private void closeOnTragicEvent(Exception ex) { if (current.getTragicException() != null) { try { @@ -765,7 +784,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** * Reads the type and the operation from the given stream. The operatino must be written with - * {@link #writeType(Operation, StreamOutput)} + * {@link Operation#writeType(Operation, StreamOutput)} */ static Operation readType(StreamInput input) throws IOException { Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte()); diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java new file mode 100644 index 00000000000..642737b0744 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java @@ -0,0 +1,160 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class AsyncIOProcessorTests extends ESTestCase { + + public void testPut() throws InterruptedException { + boolean blockInternal = randomBoolean(); + AtomicInteger received = new AtomicInteger(0); + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + if (blockInternal) { + synchronized (this) { + for (Tuple> c :candidates) { + received.incrementAndGet(); + } + } + } else { + received.addAndGet(candidates.size()); + } + } + }; + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + final int count = randomIntBetween(1000, 20000); + Thread[] thread = new Thread[randomIntBetween(3, 10)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < count; i++) { + semaphore.acquire(); + processor.put(new Object(), (ex) -> semaphore.release()); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + semaphore.acquire(Integer.MAX_VALUE); + assertEquals(count * thread.length, received.get()); + } + + public void testRandomFail() throws InterruptedException { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger failed = new AtomicInteger(0); + AtomicInteger actualFailed = new AtomicInteger(0); + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + if (randomBoolean()) { + failed.addAndGet(candidates.size()); + if (randomBoolean()) { + throw new IOException(); + } else { + throw new RuntimeException(); + } + } + } + }; + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + final int count = randomIntBetween(1000, 20000); + Thread[] thread = new Thread[randomIntBetween(3, 10)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < count; i++) { + semaphore.acquire(); + processor.put(new Object(), (ex) -> { + if (ex != null) { + actualFailed.incrementAndGet(); + } + semaphore.release(); + }); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + semaphore.acquire(Integer.MAX_VALUE); + assertEquals(count * thread.length, received.get()); + assertEquals(actualFailed.get(), failed.get()); + } + + public void testConsumerCanThrowExceptions() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + }; + processor.put(new Object(), (e) -> {notified.incrementAndGet();throw new RuntimeException();}); + processor.put(new Object(), (e) -> {notified.incrementAndGet();throw new RuntimeException();}); + assertEquals(2, notified.get()); + assertEquals(2, received.get()); + } + + public void testNullArguments() { + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024)) { + @Override + protected void write(List>> candidates) throws IOException { + } + }; + + expectThrows(NullPointerException.class, () -> processor.put(null, (e) -> {})); + expectThrows(NullPointerException.class, () -> processor.put(new Object(), null)); + } +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 061a978b019..2cf121eb9dd 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -100,6 +100,7 @@ import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; @@ -135,6 +136,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -581,6 +583,41 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertTrue(shard.getEngine().getTranslog().syncNeeded()); } + public void testAsyncFsync() throws InterruptedException { + createIndex("test"); + ensureGreen(); + client().prepareIndex("test", "bar", "1").setSource("{}").get(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + IndexShard shard = test.getShardOrNull(0); + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + Thread[] thread = new Thread[randomIntBetween(3, 5)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < 10000; i++) { + semaphore.acquire(); + shard.sync(TranslogTests.randomTranslogLocation(), (ex) -> semaphore.release()); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + semaphore.acquire(Integer.MAX_VALUE); + } + private void setDurability(IndexShard shard, Translog.Durability durability) { client().admin().indices().prepareUpdateSettings(shard.shardId.getIndexName()).setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability.name()).build()).get(); assertEquals(durability, shard.getTranslogDurability()); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 54723807019..08b4d8ac71e 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -810,6 +810,38 @@ public class TranslogTests extends ESTestCase { } } + public void testSyncUpToStream() throws IOException { + int iters = randomIntBetween(5, 10); + for (int i = 0; i < iters; i++) { + int translogOperations = randomIntBetween(10, 100); + int count = 0; + ArrayList locations = new ArrayList<>(); + for (int op = 0; op < translogOperations; op++) { + if (rarely()) { + translog.commit(); // do this first so that there is at least one pending tlog entry + } + final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); + locations.add(location); + } + Collections.shuffle(locations, random()); + if (randomBoolean()) { + assertTrue("at least one operation pending", translog.syncNeeded()); + assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); + assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced + } else if (rarely()) { + translog.commit(); + assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now + assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); + } else { + translog.sync(); + assertFalse("translog has been synced already", translog.ensureSynced(locations.stream())); + } + for (Location location : locations) { + assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); + } + } + } + public void testLocationComparison() throws IOException { List locations = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100);