Force Refresh Listeners when Acquiring all Operation Permits (#36835)
* Fixes the issue reproduced in the added tests: * When having open index requests on a shard that are waiting for a refresh, relocating that shard becomes blocked until that refresh happens (which could be never as in the test scenario).
This commit is contained in:
parent
c1beb95aa1
commit
4ac8fc6906
|
@ -607,8 +607,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer)
|
||||
throws IllegalIndexShardStateException, InterruptedException {
|
||||
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
|
||||
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
|
||||
try {
|
||||
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
|
||||
forceRefreshes.close();
|
||||
// no shard operation permits are being held here, move state from started to relocated
|
||||
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
|
||||
"in-flight operations in progress while moving shard state to relocated";
|
||||
|
@ -639,6 +641,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
// Fail primary relocation source and target shards.
|
||||
failShard("timed out waiting for relocation hand-off to complete", null);
|
||||
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
|
||||
} finally {
|
||||
forceRefreshes.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2339,7 +2343,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
verifyNotClosed();
|
||||
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
|
||||
|
||||
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
|
||||
asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
|
||||
}
|
||||
|
||||
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
|
||||
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
|
||||
final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> {
|
||||
forceRefreshes.close();
|
||||
onPermitAcquired.onResponse(r);
|
||||
}, e -> {
|
||||
forceRefreshes.close();
|
||||
onPermitAcquired.onFailure(e);
|
||||
});
|
||||
try {
|
||||
indexShardOperationPermits.asyncBlockOperations(wrappedListener, timeout, timeUnit);
|
||||
} catch (Exception e) {
|
||||
forceRefreshes.close();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
|
||||
|
@ -2349,7 +2370,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
final CountDownLatch termUpdated = new CountDownLatch(1);
|
||||
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
|
||||
asyncBlockOperations(new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
try {
|
||||
|
@ -2442,8 +2463,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> onPermitAcquired,
|
||||
final TimeValue timeout) {
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true,
|
||||
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
|
||||
onPermitAcquired, true,
|
||||
listener -> asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())
|
||||
);
|
||||
}
|
||||
|
||||
private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
|
||||
|
|
|
@ -295,7 +295,7 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
/**
|
||||
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
|
||||
*
|
||||
* @return the active operation count, or zero when all permits ar eheld
|
||||
* @return the active operation count, or zero when all permits are held
|
||||
*/
|
||||
int getActiveOperationsCount() {
|
||||
int availablePermits = semaphore.availablePermits();
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.elasticsearch.index.shard;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.search.ReferenceManager;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
|
@ -53,6 +55,13 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
|
|||
* Is this closed? If true then we won't add more listeners and have flushed all pending listeners.
|
||||
*/
|
||||
private volatile boolean closed = false;
|
||||
|
||||
/**
|
||||
* Force-refreshes new refresh listeners that are added while {@code >= 0}. Used to prevent becoming blocked on operations waiting for
|
||||
* refresh during relocation.
|
||||
*/
|
||||
private int refreshForcers;
|
||||
|
||||
/**
|
||||
* List of refresh listeners. Defaults to null and built on demand because most refresh cycles won't need it. Entries are never removed
|
||||
* from it, rather, it is nulled and rebuilt when needed again. The (hopefully) rare entries that didn't make the current refresh cycle
|
||||
|
@ -75,6 +84,32 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
|
|||
this.threadContext = threadContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}.
|
||||
*/
|
||||
public Releasable forceRefreshes() {
|
||||
synchronized (this) {
|
||||
assert refreshForcers >= 0;
|
||||
refreshForcers += 1;
|
||||
}
|
||||
final RunOnce runOnce = new RunOnce(() -> {
|
||||
synchronized (RefreshListeners.this) {
|
||||
assert refreshForcers > 0;
|
||||
refreshForcers -= 1;
|
||||
}
|
||||
});
|
||||
if (refreshNeeded()) {
|
||||
try {
|
||||
forceRefresh.run();
|
||||
} catch (Exception e) {
|
||||
runOnce.run();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
assert refreshListeners == null;
|
||||
return () -> runOnce.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it
|
||||
* forces a refresh and calls the listener immediately as well.
|
||||
|
@ -102,7 +137,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
|
|||
listeners = new ArrayList<>();
|
||||
refreshListeners = listeners;
|
||||
}
|
||||
if (listeners.size() < getMaxRefreshListeners.getAsInt()) {
|
||||
if (refreshForcers == 0 && listeners.size() < getMaxRefreshListeners.getAsInt()) {
|
||||
ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true);
|
||||
Consumer<Boolean> contextPreservingListener = forced -> {
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -342,6 +343,40 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
refresher.cancel();
|
||||
}
|
||||
|
||||
public void testDisallowAddListeners() throws Exception {
|
||||
assertEquals(0, listeners.pendingCount());
|
||||
DummyRefreshListener listener = new DummyRefreshListener();
|
||||
assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
|
||||
engine.refresh("I said so");
|
||||
assertFalse(listener.forcedRefresh.get());
|
||||
listener.assertNoError();
|
||||
|
||||
try (Releasable releaseable1 = listeners.forceRefreshes()) {
|
||||
listener = new DummyRefreshListener();
|
||||
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
|
||||
assertTrue(listener.forcedRefresh.get());
|
||||
listener.assertNoError();
|
||||
assertEquals(0, listeners.pendingCount());
|
||||
|
||||
try (Releasable releaseable2 = listeners.forceRefreshes()) {
|
||||
listener = new DummyRefreshListener();
|
||||
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
|
||||
assertTrue(listener.forcedRefresh.get());
|
||||
listener.assertNoError();
|
||||
assertEquals(0, listeners.pendingCount());
|
||||
}
|
||||
|
||||
listener = new DummyRefreshListener();
|
||||
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
|
||||
assertTrue(listener.forcedRefresh.get());
|
||||
listener.assertNoError();
|
||||
assertEquals(0, listeners.pendingCount());
|
||||
}
|
||||
|
||||
assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), new DummyRefreshListener()));
|
||||
assertEquals(1, listeners.pendingCount());
|
||||
}
|
||||
|
||||
private Engine.IndexResult index(String id) throws IOException {
|
||||
return index(id, "test");
|
||||
}
|
||||
|
|
|
@ -23,9 +23,12 @@ import com.carrotsearch.hppc.IntHashSet;
|
|||
import com.carrotsearch.hppc.procedures.IntProcedure;
|
||||
import org.apache.lucene.index.IndexFileNames;
|
||||
import org.apache.lucene.util.English;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -506,6 +509,97 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testRelocateWhileWaitingForRefresh() {
|
||||
logger.info("--> starting [node1] ...");
|
||||
final String node1 = internalCluster().startNode();
|
||||
|
||||
logger.info("--> creating test index ...");
|
||||
prepareCreate("test", Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.refresh_interval", -1) // we want to control refreshes
|
||||
).get();
|
||||
|
||||
logger.info("--> index 10 docs");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
|
||||
}
|
||||
logger.info("--> flush so we have an actual index");
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
logger.info("--> index more docs so we have something in the translog");
|
||||
for (int i = 10; i < 20; i++) {
|
||||
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
|
||||
.setSource("field", "value" + i).execute();
|
||||
}
|
||||
|
||||
logger.info("--> start another node");
|
||||
final String node2 = internalCluster().startNode();
|
||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
|
||||
.setWaitForNodes("2").execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> relocate the shard from node1 to node2");
|
||||
client().admin().cluster().prepareReroute()
|
||||
.add(new MoveAllocationCommand("test", 0, node1, node2))
|
||||
.execute().actionGet();
|
||||
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
|
||||
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> verifying count");
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
|
||||
}
|
||||
|
||||
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() {
|
||||
logger.info("--> starting [node1] ...");
|
||||
final String node1 = internalCluster().startNode();
|
||||
|
||||
logger.info("--> creating test index ...");
|
||||
prepareCreate("test", Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.refresh_interval", -1) // we want to control refreshes
|
||||
).get();
|
||||
|
||||
logger.info("--> index 10 docs");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
|
||||
}
|
||||
logger.info("--> flush so we have an actual index");
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
logger.info("--> index more docs so we have something in the translog");
|
||||
for (int i = 10; i < 20; i++) {
|
||||
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
|
||||
.setSource("field", "value" + i).execute();
|
||||
}
|
||||
|
||||
logger.info("--> start another node");
|
||||
final String node2 = internalCluster().startNode();
|
||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
|
||||
.setWaitForNodes("2").execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> relocate the shard from node1 to node2");
|
||||
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin().cluster().prepareReroute()
|
||||
.add(new MoveAllocationCommand("test", 0, node1, node2))
|
||||
.execute();
|
||||
logger.info("--> index 100 docs while relocating");
|
||||
for (int i = 20; i < 120; i++) {
|
||||
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
|
||||
.setSource("field", "value" + i).execute();
|
||||
}
|
||||
relocationListener.actionGet();
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
|
||||
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> verifying count");
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L));
|
||||
}
|
||||
|
||||
class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {
|
||||
|
||||
private final CountDownLatch corruptionCount;
|
||||
|
|
Loading…
Reference in New Issue