Expose translog stats in ReadOnlyEngine (#43752) (#43823)

Backport of #43752 for 7.x.
This commit is contained in:
Tanguy Leroux 2019-07-02 13:39:00 +02:00 committed by GitHub
parent c8ed271937
commit b977f019b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 253 additions and 4 deletions

View File

@ -79,3 +79,47 @@ setup:
indices.stats:
metric: [ translog ]
- gte: { indices.test.primaries.translog.earliest_last_modified_age: 0 }
---
"Translog stats on closed indices":
- skip:
version: " - 7.2.99"
reason: "closed indices have translog stats starting version 7.3.0"
- do:
index:
index: test
id: 1
body: { "foo": "bar" }
- do:
index:
index: test
id: 2
body: { "foo": "bar" }
- do:
index:
index: test
id: 3
body: { "foo": "bar" }
- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }
- do:
indices.close:
index: test
wait_for_active_shards: 1
- is_true: acknowledged
- do:
indices.stats:
metric: [ translog ]
expand_wildcards: all
forbid_closed_indices: false
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

View File

@ -42,6 +42,8 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;
import java.io.Closeable;
@ -108,7 +110,6 @@ public class ReadOnlyEngine extends Engine {
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
@ -119,6 +120,8 @@ public class ReadOnlyEngine extends Engine {
reader = wrapReader(reader, readerWrapperFunction);
searcherManager = new SearcherManager(reader, searcherFactory);
this.docsStats = docsStats(lastCommittedSegmentInfos);
assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time";
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
success = true;
} finally {
@ -216,6 +219,26 @@ public class ReadOnlyEngine extends Engine {
return new SeqNoStats(maxSeqNo, localCheckpoint, config.getGlobalCheckpointSupplier().getAsLong());
}
private static TranslogStats translogStats(final EngineConfig config, final SegmentInfos infos) throws IOException {
final String translogUuid = infos.getUserData().get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis()
);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
return translog.stats();
}
}
@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;
import java.io.IOException;
import java.util.List;
@ -37,6 +38,7 @@ import java.util.function.Function;
import static org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader.getElasticsearchDirectoryReader;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
public class ReadOnlyEngineTests extends EngineTestCase {
@ -183,7 +185,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
UnsupportedOperationException.class;
expectThrows(expectedException, () -> readOnlyEngine.index(null));
@ -204,7 +206,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
globalCheckpoint.set(randomNonNegativeLong());
try {
readOnlyEngine.verifyEngineBeforeIndexClosing();
@ -242,4 +244,46 @@ public class ReadOnlyEngineTests extends EngineTestCase {
}
}
}
public void testTranslogStats() throws IOException {
IOUtils.close(engine, store);
try (Store store = createStore()) {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
final int numDocs = frequently() ? scaledRandomIntBetween(10, 200) : 0;
int uncommittedDocs = 0;
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
uncommittedDocs = 0;
} else {
uncommittedDocs += 1;
}
globalCheckpoint.set(i);
}
assertThat(engine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs));
assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
assertThat(engine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
engine.flush(true, true);
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) {
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
assertThat(readOnlyEngine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
}
}
}
}

View File

@ -4083,7 +4083,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting,
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, false, Function.identity()) {
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) {
@Override
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
// just like a following shard, we need to skip this check for now.

View File

@ -22,8 +22,11 @@ package org.elasticsearch.indices.state;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
@ -34,6 +37,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
@ -54,6 +58,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class OpenCloseIndexIT extends ESIntegTestCase {
public void testSimpleCloseOpen() {
@ -346,4 +351,38 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
}
}
}
public void testTranslogStats() {
final String indexName = "test";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final int nbDocs = randomIntBetween(0, 50);
int uncommittedOps = 0;
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));
if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
uncommittedOps = 0;
} else {
uncommittedOps += 1;
}
}
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));
assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.ONE));
IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_CLOSED;
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
}
}

View File

@ -409,4 +409,39 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f));
}
}
public void testTranslogStats() throws Exception {
final String indexName = "test";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final int nbDocs = randomIntBetween(0, 50);
int uncommittedOps = 0;
for (long i = 0; i < nbDocs; i++) {
final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get();
assertThat(indexResponse.status(), is(RestStatus.CREATED));
if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
uncommittedOps = 0;
} else {
uncommittedOps += 1;
}
}
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(uncommittedOps));
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
assertIndexFrozen(indexName);
IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN_CLOSED;
stats = client().admin().indices().prepareStats(indexName).setIndicesOptions(indicesOptions).clear().setTranslog(true).get();
assertThat(stats.getIndex(indexName), notNullValue());
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().estimatedNumberOfOperations(), equalTo(nbDocs));
assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0));
}
}

View File

@ -0,0 +1,64 @@
---
setup:
- do:
indices.create:
index: test
- do:
cluster.health:
wait_for_no_initializing_shards: true
---
"Translog stats on frozen indices":
- skip:
version: " - 7.2.99"
reason: "frozen indices have translog stats starting version 7.3.0"
- do:
index:
index: test
id: 1
body: { "foo": "bar" }
- do:
index:
index: test
id: 2
body: { "foo": "bar" }
- do:
index:
index: test
id: 3
body: { "foo": "bar" }
- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 3 }
# freeze index
- do:
indices.freeze:
index: test
wait_for_active_shards: 1
- is_true: acknowledged
- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
# unfreeze index
- do:
indices.freeze:
index: test
wait_for_active_shards: 1
- is_true: acknowledged
- do:
indices.stats:
metric: [ translog ]
- match: { indices.test.primaries.translog.operations: 3 }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }