Merge pull request #14000 from s1monw/issues/12730
Don't pull translog from shadow engine
This commit is contained in:
commit
7e53123f1f
|
@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
|
|||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
|
||||
import org.elasticsearch.action.termvectors.TermVectorsRequest;
|
||||
import org.elasticsearch.action.termvectors.TermVectorsResponse;
|
||||
import org.elasticsearch.bootstrap.Elasticsearch;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.index.engine.EngineConfig;
|
|||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.TranslogStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -82,4 +83,9 @@ public final class ShadowIndexShard extends IndexShard {
|
|||
public boolean allowsPrimaryPromotion() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TranslogStats translogStats() {
|
||||
return null; // shadow engine has no translog
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,11 +18,10 @@
|
|||
*/
|
||||
package org.elasticsearch.index.translog;
|
||||
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
|
||||
|
@ -31,17 +30,23 @@ import java.io.IOException;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class TranslogStats implements ToXContent, Streamable {
|
||||
public class TranslogStats extends ToXContentToBytes implements Streamable {
|
||||
|
||||
private long translogSizeInBytes = 0;
|
||||
private int estimatedNumberOfOperations = -1;
|
||||
private long translogSizeInBytes;
|
||||
private int numberOfOperations;
|
||||
|
||||
public TranslogStats() {
|
||||
}
|
||||
|
||||
public TranslogStats(int estimatedNumberOfOperations, long translogSizeInBytes) {
|
||||
public TranslogStats(int numberOfOperations, long translogSizeInBytes) {
|
||||
if (numberOfOperations < 0) {
|
||||
throw new IllegalArgumentException("numberOfOperations must be >= 0");
|
||||
}
|
||||
if (translogSizeInBytes < 0) {
|
||||
throw new IllegalArgumentException("translogSizeInBytes must be >= 0");
|
||||
}
|
||||
assert translogSizeInBytes >= 0 : "translogSizeInBytes must be >= 0, got [" + translogSizeInBytes + "]";
|
||||
this.estimatedNumberOfOperations = estimatedNumberOfOperations;
|
||||
this.numberOfOperations = numberOfOperations;
|
||||
this.translogSizeInBytes = translogSizeInBytes;
|
||||
}
|
||||
|
||||
|
@ -50,22 +55,22 @@ public class TranslogStats implements ToXContent, Streamable {
|
|||
return;
|
||||
}
|
||||
|
||||
this.estimatedNumberOfOperations += translogStats.estimatedNumberOfOperations;
|
||||
this.translogSizeInBytes = +translogStats.translogSizeInBytes;
|
||||
this.numberOfOperations += translogStats.numberOfOperations;
|
||||
this.translogSizeInBytes += translogStats.translogSizeInBytes;
|
||||
}
|
||||
|
||||
public ByteSizeValue translogSizeInBytes() {
|
||||
return new ByteSizeValue(translogSizeInBytes);
|
||||
public long getTranslogSizeInBytes() {
|
||||
return translogSizeInBytes;
|
||||
}
|
||||
|
||||
public long estimatedNumberOfOperations() {
|
||||
return estimatedNumberOfOperations;
|
||||
return numberOfOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.TRANSLOG);
|
||||
builder.field(Fields.OPERATIONS, estimatedNumberOfOperations);
|
||||
builder.field(Fields.OPERATIONS, numberOfOperations);
|
||||
builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, translogSizeInBytes);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
|
@ -80,13 +85,13 @@ public class TranslogStats implements ToXContent, Streamable {
|
|||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
estimatedNumberOfOperations = in.readVInt();
|
||||
numberOfOperations = in.readVInt();
|
||||
translogSizeInBytes = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(estimatedNumberOfOperations);
|
||||
out.writeVInt(numberOfOperations);
|
||||
out.writeVLong(translogSizeInBytes);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.ExceptionsHelper;
|
|||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
|
@ -36,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShadowIndexShard;
|
||||
import org.elasticsearch.index.translog.TranslogStats;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -175,6 +178,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
|||
Settings idxSettings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
.put(IndexShard.INDEX_TRANSLOG_DISABLE_FLUSH, true)
|
||||
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
||||
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
||||
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
||||
|
@ -188,6 +192,21 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
|||
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
|
||||
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
|
||||
|
||||
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(IDX).clear().setTranslog(true).get();
|
||||
assertEquals(2, indicesStatsResponse.getIndex(IDX).getPrimaries().getTranslog().estimatedNumberOfOperations());
|
||||
assertEquals(2, indicesStatsResponse.getIndex(IDX).getTotal().getTranslog().estimatedNumberOfOperations());
|
||||
for (IndicesService service : internalCluster().getInstances(IndicesService.class)) {
|
||||
IndexService indexService = service.indexService(IDX);
|
||||
if (indexService != null) {
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
TranslogStats translogStats = shard.translogStats();
|
||||
assertTrue(translogStats != null || shard instanceof ShadowIndexShard);
|
||||
if (translogStats != null) {
|
||||
assertEquals(2, translogStats.estimatedNumberOfOperations());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check that we can get doc 1 and 2, because we are doing realtime
|
||||
// gets and getting from the primary
|
||||
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setRealtime(true).setFields("foo").get();
|
||||
|
|
|
@ -965,4 +965,13 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
// (shadow engine is already shut down in the try-with-resources)
|
||||
IOUtils.close(srStore, pEngine, pStore);
|
||||
}
|
||||
|
||||
public void testNoTranslog() {
|
||||
try {
|
||||
replicaEngine.getTranslog();
|
||||
fail("shadow engine has no translog");
|
||||
} catch (UnsupportedOperationException ex) {
|
||||
// all good
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.io.FileSystemUtils;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
@ -276,32 +277,63 @@ public class TranslogTests extends ESTestCase {
|
|||
final long firstOperationPosition = translog.getFirstOperationPosition();
|
||||
TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(0l));
|
||||
long lastSize = stats.translogSizeInBytes().bytes();
|
||||
long lastSize = stats.getTranslogSizeInBytes();
|
||||
assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC)));
|
||||
assertThat(lastSize, equalTo(firstOperationPosition));
|
||||
|
||||
TranslogStats total = new TranslogStats();
|
||||
translog.add(new Translog.Index("test", "1", new byte[]{1}));
|
||||
stats = stats();
|
||||
total.add(stats);
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(1l));
|
||||
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
|
||||
lastSize = stats.translogSizeInBytes().bytes();
|
||||
assertThat(stats.getTranslogSizeInBytes(), greaterThan(lastSize));
|
||||
lastSize = stats.getTranslogSizeInBytes();
|
||||
|
||||
translog.add(new Translog.Delete(newUid("2")));
|
||||
stats = stats();
|
||||
total.add(stats);
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(2l));
|
||||
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
|
||||
lastSize = stats.translogSizeInBytes().bytes();
|
||||
assertThat(stats.getTranslogSizeInBytes(), greaterThan(lastSize));
|
||||
lastSize = stats.getTranslogSizeInBytes();
|
||||
|
||||
translog.add(new Translog.Delete(newUid("3")));
|
||||
translog.prepareCommit();
|
||||
stats = stats();
|
||||
total.add(stats);
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(3l));
|
||||
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
|
||||
assertThat(stats.getTranslogSizeInBytes(), greaterThan(lastSize));
|
||||
|
||||
translog.commit();
|
||||
stats = stats();
|
||||
total.add(stats);
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(0l));
|
||||
assertThat(stats.translogSizeInBytes().bytes(), equalTo(firstOperationPosition));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition));
|
||||
assertEquals(6, total.estimatedNumberOfOperations());
|
||||
assertEquals(431, total.getTranslogSizeInBytes());
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
total.writeTo(out);
|
||||
TranslogStats copy = new TranslogStats();
|
||||
copy.readFrom(StreamInput.wrap(out.bytes()));
|
||||
|
||||
assertEquals(6, copy.estimatedNumberOfOperations());
|
||||
assertEquals(431, copy.getTranslogSizeInBytes());
|
||||
assertEquals("\"translog\"{\n" +
|
||||
" \"operations\" : 6,\n" +
|
||||
" \"size_in_bytes\" : 431\n" +
|
||||
"}", copy.toString().trim());
|
||||
|
||||
try {
|
||||
new TranslogStats(1, -1);
|
||||
fail("must be positive");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
//all well
|
||||
}
|
||||
try {
|
||||
new TranslogStats(-1, 1);
|
||||
fail("must be positive");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
//all well
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue