Stats: add CommitStats to supply information about the current commit point

Extends ShardStats with commit specific information. We currently expose commit id, generation and the user data map.

The information is also retrievable via the Rest API by using `GET _stats?level=shards`

Closes #10687
This commit is contained in:
Boaz Leskes 2015-04-20 15:25:25 +02:00
parent 1ae87ca4a2
commit a1ba339517
11 changed files with 235 additions and 44 deletions

View File

@ -66,4 +66,6 @@ setup:
- is_true: indices.test2.total.docs
- is_true: indices.test2.total.docs
- is_true: indices.test2.shards
- is_true: indices.test1.shards.0.0.commit.id
- is_true: indices.test2.shards.0.0.commit.id

View File

@ -21,11 +21,13 @@ package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.shard.IndexShard;
import java.io.IOException;
@ -38,7 +40,10 @@ public class ShardStats extends BroadcastShardOperationResponse implements ToXCo
private ShardRouting shardRouting;
CommonStats stats;
CommonStats commonStats;
@Nullable
CommitStats commitStats;
ShardStats() {
}
@ -46,7 +51,8 @@ public class ShardStats extends BroadcastShardOperationResponse implements ToXCo
public ShardStats(IndexShard indexShard, ShardRouting shardRouting, CommonStatsFlags flags) {
super(indexShard.shardId());
this.shardRouting = shardRouting;
this.stats = new CommonStats(indexShard, flags);
this.commonStats = new CommonStats(indexShard, flags);
this.commitStats = indexShard.commitStats();
}
/**
@ -57,7 +63,11 @@ public class ShardStats extends BroadcastShardOperationResponse implements ToXCo
}
public CommonStats getStats() {
return this.stats;
return this.commonStats;
}
public CommitStats getCommitStats() {
return this.commitStats;
}
public static ShardStats readShardStats(StreamInput in) throws IOException {
@ -70,14 +80,16 @@ public class ShardStats extends BroadcastShardOperationResponse implements ToXCo
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardRouting = readShardRoutingEntry(in);
stats = CommonStats.readCommonStats(in);
commonStats = CommonStats.readCommonStats(in);
commitStats = CommitStats.readOptionalCommitStatsFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardRouting.writeTo(out);
stats.writeTo(out);
commonStats.writeTo(out);
out.writeOptionalStreamable(commitStats);
}
@Override
@ -89,7 +101,10 @@ public class ShardStats extends BroadcastShardOperationResponse implements ToXCo
.field(Fields.RELOCATING_NODE, shardRouting.relocatingNodeId())
.endObject();
stats.toXContent(builder, params);
commonStats.toXContent(builder, params);
if (commitStats != null) {
commitStats.toXContent(builder, params);
}
return builder;
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.Map;
/** a class the returns dynamic information with respect to the last commit point of this shard */
public final class CommitStats implements Streamable, ToXContent {
private Map<String, String> userData;
private long generation;
private String id; // lucene commit id in base 64;
public CommitStats(SegmentInfos segmentInfos) {
// clone the map to protect against concurrent changes
userData = MapBuilder.<String, String>newMapBuilder().putAll(segmentInfos.getUserData()).immutableMap();
// lucene calls the current generation, last generation.
generation = segmentInfos.getLastGeneration();
id = Base64.encodeBytes(segmentInfos.getId());
}
private CommitStats() {
}
public static CommitStats readCommitStatsFrom(StreamInput in) throws IOException {
CommitStats commitStats = new CommitStats();
commitStats.readFrom(in);
return commitStats;
}
public static CommitStats readOptionalCommitStatsFrom(StreamInput in) throws IOException {
return in.readOptionalStreamable(new CommitStats());
}
public Map<String, String> getUserData() {
return userData;
}
public long getGeneration() {
return generation;
}
/** base64 version of the commit id (see {@link SegmentInfos#getId()} */
public String getId() {
return id;
}
@Override
public void readFrom(StreamInput in) throws IOException {
MapBuilder<String, String> builder = MapBuilder.newMapBuilder();
for (int i = in.readVInt(); i > 0; i--) {
builder.put(in.readString(), in.readOptionalString());
}
userData = builder.immutableMap();
generation = in.readLong();
id = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(userData.size());
for (Map.Entry<String, String> entry : userData.entrySet()) {
out.writeString(entry.getKey());
out.writeOptionalString(entry.getValue());
}
out.writeLong(generation);
out.writeString(id);
}
static final class Fields {
static final XContentBuilderString GENERATION = new XContentBuilderString("generation");
static final XContentBuilderString USER_DATA = new XContentBuilderString("user_data");
static final XContentBuilderString ID = new XContentBuilderString("id");
static final XContentBuilderString COMMIT = new XContentBuilderString("commit");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.COMMIT);
builder.field(Fields.ID, id);
builder.field(Fields.GENERATION, generation);
builder.field(Fields.USER_DATA, userData);
builder.endObject();
return builder;
}
}

View File

@ -137,6 +137,8 @@ public abstract class Engine implements Closeable {
return engineConfig;
}
protected abstract SegmentInfos getLastCommittedSegmentInfos();
/** A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
* is enabled
@ -281,6 +283,13 @@ public abstract class Engine implements Closeable {
}
}
/** get commits stats for the last commit */
public CommitStats commitStats() {
return new CommitStats(getLastCommittedSegmentInfos());
}
/**
* Global stats on segments.
*/

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.engine;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.*;
@ -105,7 +104,7 @@ public class InternalEngine extends Engine {
private final AtomicLong translogIdGenerator = new AtomicLong();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
private SegmentInfos lastCommittedSegmentInfos;
private volatile SegmentInfos lastCommittedSegmentInfos;
private final IndexThrottle throttle;
@ -899,6 +898,11 @@ public class InternalEngine extends Engine {
return false;
}
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}
@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());

View File

@ -20,16 +20,12 @@
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@ -38,9 +34,6 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ShadowEngine is a specialized engine that only allows read-only operations
@ -64,7 +57,7 @@ public class ShadowEngine extends Engine {
private volatile SearcherManager searcherManager;
private SegmentInfos lastCommittedSegmentInfos;
private volatile SegmentInfos lastCommittedSegmentInfos;
public ShadowEngine(EngineConfig engineConfig) {
super(engineConfig);
@ -221,4 +214,9 @@ public class ShadowEngine extends Engine {
public boolean hasUncommittedChanges() {
return false;
}
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}
}

View File

@ -592,6 +592,15 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
/**
* @return {@link CommitStats} if engine is open, otherwise null
*/
@Nullable
public CommitStats commitStats() {
Engine engine = engineUnsafe();
return engine == null ? null : engine.commitStats();
}
public IndexingStats indexingStats(String... types) {
return indexingService.stats(types);
}
@ -1258,7 +1267,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
private String getIndexUUID() {
assert indexSettings.get(IndexMetaData.SETTING_UUID) != null
|| indexSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).before(Version.V_0_90_6) :

View File

@ -157,7 +157,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
final void ensureOpen() { // for testing
final void ensureOpen() {
if (this.refCounter.refCount() <= 0) {
throw new AlreadyClosedException("store is already closed");
}

View File

@ -21,11 +21,12 @@ package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
public class IndicesStatsTests extends ElasticsearchSingleNodeTest {
@ -81,4 +82,20 @@ public class IndicesStatsTests extends ElasticsearchSingleNodeTest {
assertThat(stats2.getNormsMemoryInBytes(), greaterThan(stats.getNormsMemoryInBytes()));
assertThat(stats2.getDocValuesMemoryInBytes(), greaterThan(stats.getDocValuesMemoryInBytes()));
}
public void testCommitStats() throws Exception {
createIndex("test");
ensureGreen("test");
IndicesStatsResponse rsp = client().admin().indices().prepareStats("test").get();
for (ShardStats shardStats : rsp.getIndex("test").getShards()) {
final CommitStats commitStats = shardStats.getCommitStats();
assertNotNull(commitStats);
assertThat(commitStats.getGeneration(), greaterThan(0l));
assertThat(commitStats.getId(), notNullValue());
assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_ID_KEY));
}
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.engine;
import com.google.common.collect.ImmutableMap;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -30,12 +29,7 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.*;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
@ -65,16 +59,9 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine.Searcher;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperParser;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperAnalyzer;
import org.elasticsearch.index.mapper.MapperBuilders;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootMapper;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper;
@ -111,10 +98,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
// TODO: this guy isn't ready for mock filesystems yet
@SuppressFileSystems("*")
@ -488,6 +472,26 @@ public class InternalEngineTests extends ElasticsearchTestCase {
}
}
public void testCommitStats() {
Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
engine.create(new Engine.Create(null, newUid("1"), doc));
CommitStats stats1 = engine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0l));
assertThat(stats1.getId(), notNullValue());
assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_ID_KEY));
engine.flush(true, true);
CommitStats stats2 = engine.commitStats();
assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration()));
assertThat(stats2.getId(), notNullValue());
assertThat(stats2.getId(), not(equalTo(stats1.getId())));
assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_ID_KEY));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_ID_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_ID_KEY))));
}
@Test
public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = engine.acquireSearcher("test");

View File

@ -30,7 +30,6 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
@ -76,11 +75,7 @@ import java.util.Arrays;
import java.util.List;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
/**
* TODO: document me!
@ -268,6 +263,30 @@ public class ShadowEngineTests extends ElasticsearchTestCase {
protected static final BytesReference B_2 = new BytesArray(new byte[]{2});
protected static final BytesReference B_3 = new BytesArray(new byte[]{3});
public void testCommitStats() {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
CommitStats stats1 = replicaEngine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0l));
assertThat(stats1.getId(), notNullValue());
assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_ID_KEY));
// flush the primary engine
primaryEngine.flush();
// flush on replica to make flush visible
replicaEngine.flush();
CommitStats stats2 = replicaEngine.commitStats();
assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration()));
assertThat(stats2.getId(), notNullValue());
assertThat(stats2.getId(), not(equalTo(stats1.getId())));
assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_ID_KEY));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_ID_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_ID_KEY))));
}
@Test
public void testSegments() throws Exception {
List<Segment> segments = primaryEngine.segments(false);