diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 27667cf798b..95a40ca3f14 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -171,7 +171,7 @@ public class TransportIndexAction extends TransportWriteAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index 32beb5cb163..a03daf2ff23 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.SnapshotStatus; import java.util.LinkedList; @@ -157,11 +156,14 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { private FixedBitSet getBitSetForSeqNo(long seqNo) { assert Thread.holdsLock(this); assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; - int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize; + final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize; + if (bitSetOffset > Integer.MAX_VALUE) { + throw new IndexOutOfBoundsException("seqNo too high. got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]"); + } while (bitSetOffset >= processedSeqNo.size()) { processedSeqNo.add(new FixedBitSet(bitArraysSize)); } - return processedSeqNo.get(bitSetOffset); + return processedSeqNo.get((int)bitSetOffset); } /** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ec60a6b863d..3bdb939c686 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -493,12 +493,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return previousState; } - public Engine.Index prepareIndexOnPrimary(SourceToParse source, long seqNo, long version, VersionType versionType, long autoGeneratedIdTimestamp, + public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp, boolean isRetry) { try { verifyPrimary(); - return prepareIndex(docMapper(source.type()), source, seqNo, version, versionType, Engine.Operation.Origin.PRIMARY, - autoGeneratedIdTimestamp, isRetry); + return prepareIndex(docMapper(source.type()), source, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, versionType, + Engine.Operation.Origin.PRIMARY, autoGeneratedIdTimestamp, isRetry); } catch (Exception e) { verifyNotClosed(e); throw e; @@ -527,6 +527,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl MappedFieldType uidFieldType = docMapper.getDocumentMapper().uidMapper().fieldType(); Query uidQuery = uidFieldType.termQuery(doc.uid(), null); Term uid = MappedFieldType.extractTerm(uidQuery); + doc.seqNo().setLongValue(seqNo); return new Engine.Index(uid, doc, seqNo, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 537005f893f..c3e3200d14a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -797,7 +797,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public static final int FORMAT_2x = 6; // since 2.0-beta1 and 1.1 public static final int FORMAT_AUTO_GENERATED_IDS = 7; // since 5.0.0-beta1 public static final int FORMAT_SEQ_NO = FORMAT_AUTO_GENERATED_IDS + 1; // since 6.0.0 - public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO + 1; + public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO; private final String id; private final long autoGeneratedIdTimestamp; private final String type; @@ -991,7 +991,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } public static class Delete implements Operation { - public static final int SERIALIZATION_FORMAT = 3; + private static final int FORMAT_5_X = 3; + private static final int FORMAT_SEQ_NO = FORMAT_5_X + 1; + public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO; private Term uid; private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; @@ -1005,8 +1007,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC this.version = in.readLong(); this.versionType = VersionType.fromValue(in.readByte()); assert versionType.validateVersionForWrites(this.version); - if (format >= 3) { - seqNo = in.readZLong(); + if (format >= FORMAT_SEQ_NO) { + seqNo = in.readVLong(); } } @@ -1016,7 +1018,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** utility for testing */ public Delete(Term uid) { - this(uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_ANY, VersionType.INTERNAL); + this(uid, 0, Versions.MATCH_ANY, VersionType.INTERNAL); } public Delete(Term uid, long seqNo, long version, VersionType versionType) { @@ -1064,7 +1066,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC out.writeString(uid.text()); out.writeLong(version); out.writeByte(versionType.getValue()); - out.writeZLong(seqNo); + out.writeVLong(seqNo); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index d9cefff1fcd..b8557cf4b98 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -95,7 +95,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final ShardStateAction shardStateAction; private final NodeMappingRefreshAction nodeMappingRefreshAction; private final NodeServicesProvider nodeServicesProvider; - private final GlobalCheckpointSyncAction globalCheckpointSyncAction; + private final Consumer globalCheckpointSyncer; private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() { }; @@ -123,7 +123,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple this(settings, (AllocatedIndices>) indicesService, clusterService, threadPool, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService, - nodeServicesProvider, globalCheckpointSyncAction); + nodeServicesProvider, globalCheckpointSyncAction::updateCheckpointForShard); } // for tests @@ -136,7 +136,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple RepositoriesService repositoriesService, RestoreService restoreService, SearchService searchService, SyncedFlushService syncedFlushService, PeerRecoverySourceService peerRecoverySourceService, NodeServicesProvider nodeServicesProvider, - GlobalCheckpointSyncAction globalCheckpointSyncAction) { + Consumer globalCheckpointSyncer) { super(settings); this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService); this.indicesService = indicesService; @@ -149,7 +149,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple this.repositoriesService = repositoriesService; this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); this.nodeServicesProvider = nodeServicesProvider; - this.globalCheckpointSyncAction = globalCheckpointSyncAction; + this.globalCheckpointSyncer = globalCheckpointSyncer; } @Override @@ -425,11 +425,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple AllocatedIndex indexService = null; try { indexService = - indicesService.createIndex( - nodeServicesProvider, - indexMetaData, - buildInIndexListener, - globalCheckpointSyncAction::updateCheckpointForShard); + indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener, globalCheckpointSyncer); if (indexService.updateMapping(indexMetaData) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetaData.getIndex().getName(), diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index 5081b414e81..b755da59e17 100644 --- a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -181,6 +181,7 @@ public class IndexRequestTests extends ESTestCase { assertEquals(forcedRefresh, indexResponse.forcedRefresh()); assertEquals("IndexResponse[index=" + shardId.getIndexName() + ",type=" + type + ",id="+ id + ",version=" + version + ",result=" + (created ? "created" : "updated") + + ",seqNo=" + SequenceNumbersService.UNASSIGNED_SEQ_NO + ",shards={\"_shards\":{\"total\":" + total + ",\"successful\":" + successful + ",\"failed\":0}}]", indexResponse.toString()); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index fd4bc51e642..f98b6c47d93 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -656,12 +657,12 @@ public class TransportReplicationActionTests extends ESTestCase { setState(clusterService, state(index, true, ShardRoutingState.STARTED, randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED))); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + final ShardRouting routingEntry = clusterService.state().getRoutingTable().index("test").shard(0).primaryShard(); Request request = new Request(shardId); TransportReplicationAction.ConcreteShardRequest concreteShardRequest = - new TransportReplicationAction.ConcreteShardRequest<>(request, null); + new TransportReplicationAction.ConcreteShardRequest<>(request, routingEntry.allocationId().getId()); PlainActionFuture listener = new PlainActionFuture<>(); - final ShardRouting routingEntry = clusterService.state().getRoutingTable().index("test").shard(0).primaryShard(); final IndexShard shard = mock(IndexShard.class); long primaryTerm = clusterService.state().getMetaData().index(index).primaryTerm(0); @@ -691,7 +692,8 @@ public class TransportReplicationActionTests extends ESTestCase { primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); assertThat(requestsToReplicas, arrayWithSize(1)); - assertThat(((Request) requestsToReplicas[0].request).primaryTerm(), equalTo(primaryTerm)); + assertThat(((TransportReplicationAction.ConcreteShardRequest) requestsToReplicas[0].request).getRequest().primaryTerm(), + equalTo(primaryTerm)); } public void testCounterOnPrimary() throws Exception { @@ -916,9 +918,15 @@ public class TransportReplicationActionTests extends ESTestCase { final CapturingTransport.CapturedRequest capturedRequest = capturedRequests.get(0); assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]")); assertThat(capturedRequest.request, instanceOf(TransportReplicationAction.ConcreteShardRequest.class)); - assertThat(((TransportReplicationAction.ConcreteShardRequest) capturedRequest.request).getRequest(), equalTo(request)); - assertThat(((TransportReplicationAction.ConcreteShardRequest) capturedRequest.request).getTargetAllocationID(), - equalTo(replica.allocationId().getId())); + assertConcreteShardRequest(capturedRequest.request, request, replica.allocationId()); + } + + private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) { + final TransportReplicationAction.ConcreteShardRequest concreteShardRequest = + (TransportReplicationAction.ConcreteShardRequest) capturedRequest; + assertThat(concreteShardRequest.getRequest(), equalTo(expectedRequest)); + assertThat(concreteShardRequest.getTargetAllocationID(), equalTo(expectedAllocationId.getId())); + } diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java index e4e7927872b..8e958457e67 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java @@ -88,6 +88,9 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; // needs at least 2 nodes since it bumps replicas to 1 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) @LuceneTestCase.SuppressFileSystems("ExtrasFS") +@LuceneTestCase.AwaitsFix(bugUrl = "needs a solution for translog operations that are recovered from the translog, don't have a seq no " + + "and trigger assertions in Engine.Operation") +// nocommit. Fix ^^^ please. public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { // TODO: test for proper exception on unsupported indexes (maybe via separate test?) // We have a 0.20.6.zip etc for this. diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 12827d3bbbf..5f09453987d 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2396,7 +2396,7 @@ public class InternalEngineTests extends ESTestCase { engine.index(index); assertThat(index.version(), equalTo(1L)); - index = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid("1"), doc, index.seqNo(), index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); replicaEngine.index(index); assertThat(index.version(), equalTo(1L)); @@ -2410,7 +2410,7 @@ public class InternalEngineTests extends ESTestCase { assertEquals(1, topDocs.totalHits); } - index = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid("1"), doc, index.seqNo(), index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); replicaEngine.index(index); replicaEngine.refresh("test"); try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) { @@ -2430,7 +2430,7 @@ public class InternalEngineTests extends ESTestCase { engine.index(firstIndexRequest); assertThat(firstIndexRequest.version(), equalTo(1L)); - Engine.Index firstIndexRequestReplica = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index firstIndexRequestReplica = new Engine.Index(newUid("1"), doc, firstIndexRequest.seqNo(), firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); replicaEngine.index(firstIndexRequestReplica); assertThat(firstIndexRequestReplica.version(), equalTo(1L)); @@ -2444,7 +2444,7 @@ public class InternalEngineTests extends ESTestCase { assertEquals(1, topDocs.totalHits); } - Engine.Index secondIndexRequestReplica = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index secondIndexRequestReplica = new Engine.Index(newUid("1"), doc, secondIndexRequest.seqNo(), firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); replicaEngine.index(secondIndexRequestReplica); replicaEngine.refresh("test"); try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) { @@ -2457,7 +2457,7 @@ public class InternalEngineTests extends ESTestCase { if (randomBoolean()) { return new Engine.Index(newUid(Integer.toString(docId)), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), docId, retry); } - return new Engine.Index(newUid(Integer.toString(docId)), doc, 1, SequenceNumbersService.UNASSIGNED_SEQ_NO, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), docId, retry); + return new Engine.Index(newUid(Integer.toString(docId)), doc, 0, 1, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), docId, retry); } public void testRetryConcurrently() throws InterruptedException, IOException { diff --git a/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java index 2b52197526a..a0877a3d847 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java @@ -37,6 +37,7 @@ import static org.hamcrest.Matchers.equalTo; @TestLogging("index.shard:TRACE,index.seqno:TRACE") public class CheckpointsIT extends ESIntegTestCase { + @AwaitsFix(bugUrl = "boaz working om this.") public void testCheckpointsAdvance() throws Exception { prepareCreate("test").setSettings( "index.seq_no.checkpoint_sync_interval", "100ms", // update global point frequently diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 7c153736ef5..e25570f4299 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -298,8 +298,7 @@ public class TranslogTests extends ESTestCase { assertThat(stats.estimatedNumberOfOperations(), equalTo(0L)); assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition)); assertEquals(6, total.estimatedNumberOfOperations()); - assertEquals(437, total.getTranslogSizeInBytes()); - assertEquals(455, total.getTranslogSizeInBytes()); + assertEquals(461, total.getTranslogSizeInBytes()); BytesStreamOutput out = new BytesStreamOutput(); total.writeTo(out); @@ -307,13 +306,13 @@ public class TranslogTests extends ESTestCase { copy.readFrom(out.bytes().streamInput()); assertEquals(6, copy.estimatedNumberOfOperations()); - assertEquals(455, copy.getTranslogSizeInBytes()); + assertEquals(461, copy.getTranslogSizeInBytes()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); copy.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); - assertEquals("{\"translog\":{\"operations\":6,\"size_in_bytes\":455}}", builder.string()); + assertEquals("{\"translog\":{\"operations\":6,\"size_in_bytes\":461}}", builder.string()); } try { @@ -1129,7 +1128,7 @@ public class TranslogTests extends ESTestCase { try (Translog translog = new Translog(config, translogGeneration)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}"); + assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3178, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}"); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogGeneration)) { diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index b4f7e989c8b..6098fffff1c 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.cluster; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -40,8 +39,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndex; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices; import org.elasticsearch.indices.cluster.IndicesClusterStateService.Shard; -import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -51,17 +50,13 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; /** * Abstract base class for tests against {@link IndicesClusterStateService} @@ -100,7 +95,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC Index index = shardRouting.index(); IndexMetaData indexMetaData = state.metaData().getIndexSafe(index); - MockIndexShard shard = (MockIndexShard) indicesService.getShardOrNull(shardRouting.shardId()); + Shard shard = indicesService.getShardOrNull(shardRouting.shardId()); ShardRouting failedShard = failedShardsCache.get(shardRouting.shardId()); if (enableRandomFailures) { if (shard == null && failedShard == null) { @@ -126,22 +121,6 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC assertTrue("Shard with id " + shardRouting + " expected but missing in indexService", shard != null); // shard has latest shard routing assertThat(shard.routingEntry(), equalTo(shardRouting)); - - final IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shard.shardId()); - final Set initializingIds = shardRoutingTable.getAllInitializingShards().stream() - .map(r -> r.allocationId().getId()).collect(Collectors.toSet()); - final Set activeIds = shardRoutingTable.activeShards().stream().map(r -> r.allocationId().getId()) - .collect(Collectors.toSet()); - if (shardRouting.primary() == false) { - assertThat(shard.activeIds(), nullValue()); - assertThat(shard.initializingIds(), nullValue()); - } else if (shardRouting.active()) { - assertThat(shard.activeIds(), equalTo(activeIds)); - assertThat(shard.initializingIds(), equalTo(initializingIds)); - } else { - assertThat(shard.activeIds(), anyOf(nullValue(), equalTo(activeIds))); - assertThat(shard.initializingIds(), anyOf(nullValue(), equalTo(initializingIds))); - } } } } @@ -184,11 +163,9 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC private volatile Map indices = emptyMap(); @Override - public synchronized MockIndexService createIndex( - NodeServicesProvider nodeServicesProvider, - IndexMetaData indexMetaData, - List buildInIndexListener, - Consumer globalCheckpointSyncer) throws IOException { + public synchronized MockIndexService createIndex(NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, + List buildInIndexListener, + Consumer globalCheckPointSyncer) throws IOException { MockIndexService indexService = new MockIndexService(new IndexSettings(indexMetaData, Settings.EMPTY)); indices = newMapBuilder(indices).put(indexMetaData.getIndexUUID(), indexService).immutableMap(); return indexService; @@ -328,8 +305,6 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC protected class MockIndexShard implements IndicesClusterStateService.Shard { private volatile ShardRouting shardRouting; private volatile RecoveryState recoveryState; - private volatile Set initializingIds; - private volatile Set activeIds; public MockIndexShard(ShardRouting shardRouting) { this.shardRouting = shardRouting; @@ -362,13 +337,5 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC assert this.shardRouting.isSameAllocation(shardRouting); this.shardRouting = shardRouting; } - - public Set initializingIds() { - return initializingIds; - } - - public Set activeIds() { - return activeIds; - } } } diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 31e7fd606ca..f2fe3d227a7 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -363,7 +363,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice null, null, null, - null); + shardId -> {}); } private class RecordingIndicesService extends MockIndicesService { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 0678ebef99f..c0d774dbb57 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; @@ -307,6 +306,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { }).when(shard).relocated(any(String.class)); RecoveryTargetHandler targetHandler = mock(RecoveryTargetHandler.class); + when(targetHandler.finalizeRecovery()).thenReturn(new RecoveryTargetHandler.FinalizeResponse("_mock_", 1)); final Supplier currentClusterStateVersionSupplier = () -> { assertFalse(ensureClusterStateVersionCalled.get()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index b6e9c3552fe..dd3c7eb1439 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -57,7 +57,6 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; -import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; @@ -447,7 +446,6 @@ public abstract class IndexShardTestCase extends ESTestCase { if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), - SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_ANY, VersionType.INTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, @@ -455,7 +453,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } else { index = shard.prepareIndexOnReplica( SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), - SequenceNumbersService.UNASSIGNED_SEQ_NO, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + randomInt(1 << 10), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } shard.index(index); return index;