Add refresh stats tracking for realtime get (#25052)
Passes a `LongConsumer` into the `Engine` during GETs which the engine calls if it refreshed to perform the get. Closes #24806
This commit is contained in:
parent
d6d0c13bd6
commit
40a13345d7
|
@ -89,6 +89,7 @@ import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.LongConsumer;
|
||||||
|
|
||||||
public abstract class Engine implements Closeable {
|
public abstract class Engine implements Closeable {
|
||||||
|
|
||||||
|
@ -485,11 +486,7 @@ public abstract class Engine implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public final GetResult get(Get get) throws EngineException {
|
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException;
|
||||||
return get(get, this::acquireSearcher);
|
|
||||||
}
|
|
||||||
|
|
||||||
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a new searcher instance. The consumer of this
|
* Returns a new searcher instance. The consumer of this
|
||||||
|
|
|
@ -92,6 +92,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.LongConsumer;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
public class InternalEngine extends Engine {
|
public class InternalEngine extends Engine {
|
||||||
|
@ -404,7 +405,7 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
|
public GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException {
|
||||||
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
|
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
|
||||||
try (ReleasableLock ignored = readLock.acquire()) {
|
try (ReleasableLock ignored = readLock.acquire()) {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
|
@ -418,7 +419,9 @@ public class InternalEngine extends Engine {
|
||||||
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
|
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
|
||||||
get.versionType().explainConflictForReads(versionValue.version, get.version()));
|
get.versionType().explainConflictForReads(versionValue.version, get.version()));
|
||||||
}
|
}
|
||||||
|
long time = System.nanoTime();
|
||||||
refresh("realtime_get");
|
refresh("realtime_get");
|
||||||
|
onRefresh.accept(System.nanoTime() - time);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -661,7 +661,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
|
|
||||||
public Engine.GetResult get(Engine.Get get) {
|
public Engine.GetResult get(Engine.Get get) {
|
||||||
readAllowed();
|
readAllowed();
|
||||||
return getEngine().get(get, this::acquireSearcher);
|
return getEngine().get(get, this::acquireSearcher, (timeElapsed) -> refreshMetric.inc(timeElapsed));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -920,7 +920,10 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
engine.index(indexForDoc(doc));
|
engine.index(indexForDoc(doc));
|
||||||
|
|
||||||
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
|
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
|
||||||
latestGetResult.set(engine.get(newGet(true, doc)));
|
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||||
|
final AtomicBoolean refreshed = new AtomicBoolean(false);
|
||||||
|
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)));
|
||||||
|
assertTrue("failed to refresh", refreshed.get());
|
||||||
final AtomicBoolean flushFinished = new AtomicBoolean(false);
|
final AtomicBoolean flushFinished = new AtomicBoolean(false);
|
||||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||||
Thread getThread = new Thread(() -> {
|
Thread getThread = new Thread(() -> {
|
||||||
|
@ -934,7 +937,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
if (previousGetResult != null) {
|
if (previousGetResult != null) {
|
||||||
previousGetResult.release();
|
previousGetResult.release();
|
||||||
}
|
}
|
||||||
latestGetResult.set(engine.get(newGet(true, doc)));
|
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done")));
|
||||||
if (latestGetResult.get().exists() == false) {
|
if (latestGetResult.get().exists() == false) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -954,6 +957,9 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||||
searchResult.close();
|
searchResult.close();
|
||||||
|
|
||||||
|
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||||
|
final AtomicBoolean refreshed = new AtomicBoolean(false);
|
||||||
|
|
||||||
// create a document
|
// create a document
|
||||||
Document document = testDocumentWithTextField();
|
Document document = testDocumentWithTextField();
|
||||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||||
|
@ -967,12 +973,13 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
searchResult.close();
|
searchResult.close();
|
||||||
|
|
||||||
// but, not there non realtime
|
// but, not there non realtime
|
||||||
Engine.GetResult getResult = engine.get(newGet(false, doc));
|
Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
|
||||||
assertThat(getResult.exists(), equalTo(false));
|
assertThat(getResult.exists(), equalTo(false));
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
|
||||||
// but, we can still get it (in realtime)
|
// but, we can still get it (in realtime)
|
||||||
getResult = engine.get(newGet(true, doc));
|
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true));
|
||||||
|
assertTrue("failed to refresh", refreshed.getAndSet(false));
|
||||||
assertThat(getResult.exists(), equalTo(true));
|
assertThat(getResult.exists(), equalTo(true));
|
||||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
@ -987,7 +994,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
searchResult.close();
|
searchResult.close();
|
||||||
|
|
||||||
// also in non realtime
|
// also in non realtime
|
||||||
getResult = engine.get(newGet(false, doc));
|
getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
|
||||||
assertThat(getResult.exists(), equalTo(true));
|
assertThat(getResult.exists(), equalTo(true));
|
||||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
@ -1007,7 +1014,8 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
searchResult.close();
|
searchResult.close();
|
||||||
|
|
||||||
// but, we can still get it (in realtime)
|
// but, we can still get it (in realtime)
|
||||||
getResult = engine.get(newGet(true, doc));
|
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true));
|
||||||
|
assertTrue("failed to refresh", refreshed.get());
|
||||||
assertThat(getResult.exists(), equalTo(true));
|
assertThat(getResult.exists(), equalTo(true));
|
||||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
@ -1032,7 +1040,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
searchResult.close();
|
searchResult.close();
|
||||||
|
|
||||||
// but, get should not see it (in realtime)
|
// but, get should not see it (in realtime)
|
||||||
getResult = engine.get(newGet(true, doc));
|
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
|
||||||
assertThat(getResult.exists(), equalTo(false));
|
assertThat(getResult.exists(), equalTo(false));
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
|
||||||
|
@ -1072,7 +1080,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
engine.flush();
|
engine.flush();
|
||||||
|
|
||||||
// and, verify get (in real time)
|
// and, verify get (in real time)
|
||||||
getResult = engine.get(newGet(true, doc));
|
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done"));
|
||||||
assertThat(getResult.exists(), equalTo(true));
|
assertThat(getResult.exists(), equalTo(true));
|
||||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
@ -1858,6 +1866,8 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
|
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
|
||||||
final Term uidTerm = newUid(doc);
|
final Term uidTerm = newUid(doc);
|
||||||
engine.index(indexForDoc(doc));
|
engine.index(indexForDoc(doc));
|
||||||
|
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||||
|
final AtomicBoolean refreshed = new AtomicBoolean(false);
|
||||||
for (int i = 0; i < thread.length; i++) {
|
for (int i = 0; i < thread.length; i++) {
|
||||||
thread[i] = new Thread(() -> {
|
thread[i] = new Thread(() -> {
|
||||||
startGun.countDown();
|
startGun.countDown();
|
||||||
|
@ -1867,7 +1877,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
throw new AssertionError(e);
|
throw new AssertionError(e);
|
||||||
}
|
}
|
||||||
for (int op = 0; op < opsPerThread; op++) {
|
for (int op = 0; op < opsPerThread; op++) {
|
||||||
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) {
|
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
|
||||||
FieldsVisitor visitor = new FieldsVisitor(true);
|
FieldsVisitor visitor = new FieldsVisitor(true);
|
||||||
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
|
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
|
||||||
List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
|
List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
|
||||||
|
@ -1895,6 +1905,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
for (int i = 0; i < thread.length; i++) {
|
for (int i = 0; i < thread.length; i++) {
|
||||||
thread[i].join();
|
thread[i].join();
|
||||||
}
|
}
|
||||||
|
assertTrue("failed to refresh", refreshed.getAndSet(false));
|
||||||
List<OpAndVersion> sortedHistory = new ArrayList<>(history);
|
List<OpAndVersion> sortedHistory = new ArrayList<>(history);
|
||||||
sortedHistory.sort(Comparator.comparing(o -> o.version));
|
sortedHistory.sort(Comparator.comparing(o -> o.version));
|
||||||
Set<String> currentValues = new HashSet<>();
|
Set<String> currentValues = new HashSet<>();
|
||||||
|
@ -1909,7 +1920,8 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
assertTrue(op.added + " should not exist", exists);
|
assertTrue(op.added + " should not exist", exists);
|
||||||
}
|
}
|
||||||
|
|
||||||
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm))) {
|
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
|
||||||
|
assertTrue("failed to refresh", refreshed.get());
|
||||||
FieldsVisitor visitor = new FieldsVisitor(true);
|
FieldsVisitor visitor = new FieldsVisitor(true);
|
||||||
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
|
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
|
||||||
List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString()));
|
List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString()));
|
||||||
|
@ -2262,6 +2274,8 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
|
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
|
||||||
engine.config().setEnableGcDeletes(false);
|
engine.config().setEnableGcDeletes(false);
|
||||||
|
|
||||||
|
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||||
|
|
||||||
// Add document
|
// Add document
|
||||||
Document document = testDocument();
|
Document document = testDocument();
|
||||||
document.add(new TextField("value", "test1", Field.Store.YES));
|
document.add(new TextField("value", "test1", Field.Store.YES));
|
||||||
|
@ -2273,7 +2287,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
|
engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
|
||||||
|
|
||||||
// Get should not find the document
|
// Get should not find the document
|
||||||
Engine.GetResult getResult = engine.get(newGet(true, doc));
|
Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
|
||||||
assertThat(getResult.exists(), equalTo(false));
|
assertThat(getResult.exists(), equalTo(false));
|
||||||
|
|
||||||
// Give the gc pruning logic a chance to kick in
|
// Give the gc pruning logic a chance to kick in
|
||||||
|
@ -2287,7 +2301,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
|
engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
|
||||||
|
|
||||||
// Get should not find the document (we never indexed uid=2):
|
// Get should not find the document (we never indexed uid=2):
|
||||||
getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")));
|
getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
|
||||||
assertThat(getResult.exists(), equalTo(false));
|
assertThat(getResult.exists(), equalTo(false));
|
||||||
|
|
||||||
// Try to index uid=1 with a too-old version, should fail:
|
// Try to index uid=1 with a too-old version, should fail:
|
||||||
|
@ -2297,7 +2311,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
|
||||||
// Get should still not find the document
|
// Get should still not find the document
|
||||||
getResult = engine.get(newGet(true, doc));
|
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
|
||||||
assertThat(getResult.exists(), equalTo(false));
|
assertThat(getResult.exists(), equalTo(false));
|
||||||
|
|
||||||
// Try to index uid=2 with a too-old version, should fail:
|
// Try to index uid=2 with a too-old version, should fail:
|
||||||
|
@ -2307,7 +2321,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
|
||||||
// Get should not find the document
|
// Get should not find the document
|
||||||
getResult = engine.get(newGet(true, doc));
|
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
|
||||||
assertThat(getResult.exists(), equalTo(false));
|
assertThat(getResult.exists(), equalTo(false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3639,6 +3653,8 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||||
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
|
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
|
||||||
final Term uid = newUid(doc);
|
final Term uid = newUid(doc);
|
||||||
|
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||||
|
final AtomicBoolean refreshed = new AtomicBoolean(false);
|
||||||
for (int i = 0; i < numberOfOperations; i++) {
|
for (int i = 0; i < numberOfOperations; i++) {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
final Engine.Index index = new Engine.Index(
|
final Engine.Index index = new Engine.Index(
|
||||||
|
@ -3700,7 +3716,8 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
|
assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
|
||||||
try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid))) {
|
try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory, (onRefresh) -> refreshed.set(exists))) {
|
||||||
|
assertEquals("failed to refresh", exists, refreshed.get());
|
||||||
assertThat(result.exists(), equalTo(exists));
|
assertThat(result.exists(), equalTo(exists));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,7 +298,8 @@ public class RefreshListenersTests extends ESTestCase {
|
||||||
listener.assertNoError();
|
listener.assertNoError();
|
||||||
|
|
||||||
Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId));
|
Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId));
|
||||||
try (Engine.GetResult getResult = engine.get(get)) {
|
try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher,
|
||||||
|
onRefresh -> fail("shouldn't have a refresh"))) {
|
||||||
assertTrue("document not found", getResult.exists());
|
assertTrue("document not found", getResult.exists());
|
||||||
assertEquals(iteration, getResult.version());
|
assertEquals(iteration, getResult.version());
|
||||||
SingleFieldsVisitor visitor = new SingleFieldsVisitor("test");
|
SingleFieldsVisitor visitor = new SingleFieldsVisitor("test");
|
||||||
|
|
Loading…
Reference in New Issue