[TEST] Remove TransportUpdateActionTest
This test has been made obselete by the UpdateTests.
This commit is contained in:
parent
618b51b4cd
commit
6d641ea40d
|
@ -1,221 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.update;
|
||||
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.engine.internal.InternalEngine;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.warmer.IndicesWarmer;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* An implementation of {@link Engine} only intended for use with {@link TransportUpdateActionTest}.
|
||||
*/
|
||||
public class InternalEngineWithControllableTimingForTesting extends InternalEngine implements Engine {
|
||||
|
||||
/*
|
||||
* Not the best programming practice, but a simple way to make the instance accessible from test classes. The
|
||||
* "cleaner" way - making the appropriate Guice injector of the respective index available to the test class is
|
||||
* rather difficult and fragile, too. As long as tests requiring multiple instances of this class are not run in
|
||||
* parallel, everything will be fine. Currently, there is just a single test suite that uses only a single instance
|
||||
* anyway.
|
||||
*/
|
||||
public static InternalEngineWithControllableTimingForTesting currentTestInstance;
|
||||
|
||||
private AtomicBoolean nextGetThrowsException = new AtomicBoolean();
|
||||
|
||||
private Semaphore createOperationReceived = new Semaphore(0);
|
||||
private Semaphore letCreateOperationBegin = new Semaphore(0);
|
||||
private Semaphore createOperationFinished = new Semaphore(0);
|
||||
private Semaphore letCreateOperationReturn = new Semaphore(0);
|
||||
|
||||
private Semaphore indexOperationReceived = new Semaphore(0);
|
||||
private Semaphore letIndexOperationBegin = new Semaphore(0);
|
||||
private Semaphore indexOperationFinished = new Semaphore(0);
|
||||
private Semaphore letIndexOperationReturn = new Semaphore(0);
|
||||
|
||||
private Semaphore deleteOperationReceived = new Semaphore(0);
|
||||
private Semaphore letDeleteOperationBegin = new Semaphore(0);
|
||||
private Semaphore deleteOperationFinished = new Semaphore(0);
|
||||
private Semaphore letDeleteOperationReturn = new Semaphore(0);
|
||||
|
||||
// safety timeout so that if something goes wrong the test does not block forever
|
||||
private static final long SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS = 5;
|
||||
|
||||
@Inject
|
||||
public InternalEngineWithControllableTimingForTesting(ShardId shardId, Settings indexSettings, ThreadPool threadPool,
|
||||
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, IndicesWarmer warmer, Store store,
|
||||
SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider,
|
||||
MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService,
|
||||
CodecService codecService) throws EngineException {
|
||||
super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store, deletionPolicy, translog,
|
||||
mergePolicyProvider, mergeScheduler, analysisService, similarityService, codecService);
|
||||
// 'this' escapes from the constructor, but for the purpose of this test it is fine.
|
||||
currentTestInstance = this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetResult get(Get get) throws EngineException {
|
||||
if (nextGetThrowsException.getAndSet(false)) {
|
||||
Uid uid = Uid.createUid(get.uid().text());
|
||||
long dummyVersion = 1000L;
|
||||
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), dummyVersion, get.version());
|
||||
}
|
||||
return super.get(get);
|
||||
}
|
||||
|
||||
private void acquireWithTimeout(Semaphore semaphore) {
|
||||
try {
|
||||
boolean acquired = semaphore.tryAcquire(SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
if (!acquired){
|
||||
throw new RuntimeException("(Integration test:) Cannot acquire semaphore within the specified timeout of "
|
||||
+ SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS + " seconds");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void create(Create create) throws EngineException {
|
||||
createOperationReceived.release();
|
||||
acquireWithTimeout(letCreateOperationBegin);
|
||||
try {
|
||||
super.create(create);
|
||||
} finally {
|
||||
createOperationFinished.release();
|
||||
acquireWithTimeout(letCreateOperationReturn);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void index(Index index) throws EngineException {
|
||||
indexOperationReceived.release();
|
||||
acquireWithTimeout(letIndexOperationBegin);
|
||||
try {
|
||||
super.index(index);
|
||||
} finally {
|
||||
indexOperationFinished.release();
|
||||
acquireWithTimeout(letIndexOperationReturn);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Delete delete) throws EngineException {
|
||||
deleteOperationReceived.release();
|
||||
acquireWithTimeout(letDeleteOperationBegin);
|
||||
try {
|
||||
super.delete(delete);
|
||||
} finally {
|
||||
deleteOperationFinished.release();
|
||||
acquireWithTimeout(letDeleteOperationReturn);
|
||||
}
|
||||
}
|
||||
|
||||
public void letNextGetThrowException() {
|
||||
nextGetThrowsException.set(true);
|
||||
}
|
||||
|
||||
public void waitUntilCreateOperationReceived() {
|
||||
acquireWithTimeout(createOperationReceived);
|
||||
}
|
||||
|
||||
public void letCreateOperationBegin() {
|
||||
letCreateOperationBegin.release();
|
||||
}
|
||||
|
||||
public void waitUntilCreateOperationFinished() {
|
||||
acquireWithTimeout(createOperationFinished);
|
||||
}
|
||||
|
||||
public void letCreateOperationReturn() {
|
||||
letCreateOperationReturn.release();
|
||||
}
|
||||
|
||||
public void waitUntilIndexOperationReceived() {
|
||||
acquireWithTimeout(indexOperationReceived);
|
||||
}
|
||||
|
||||
public void letIndexOperationBegin() {
|
||||
letIndexOperationBegin.release();
|
||||
}
|
||||
|
||||
public void waitUntilIndexOperationFinished() {
|
||||
acquireWithTimeout(indexOperationFinished);
|
||||
}
|
||||
|
||||
public void letIndexOperationReturn() {
|
||||
letIndexOperationReturn.release();
|
||||
}
|
||||
|
||||
public void waitUntilDeleteOperationReceived() {
|
||||
acquireWithTimeout(deleteOperationReceived);
|
||||
}
|
||||
|
||||
public void letDeleteOperationBegin() {
|
||||
letDeleteOperationBegin.release();
|
||||
}
|
||||
|
||||
public void waitUntilDeleteOperationFinished() {
|
||||
acquireWithTimeout(deleteOperationFinished);
|
||||
}
|
||||
|
||||
public void letDeleteOperationReturn() {
|
||||
letDeleteOperationReturn.release();
|
||||
}
|
||||
|
||||
public void resetSemaphores() {
|
||||
nextGetThrowsException = new AtomicBoolean();
|
||||
|
||||
createOperationReceived = new Semaphore(0);
|
||||
letCreateOperationBegin = new Semaphore(0);
|
||||
createOperationFinished = new Semaphore(0);
|
||||
letCreateOperationReturn = new Semaphore(0);
|
||||
|
||||
indexOperationReceived = new Semaphore(0);
|
||||
letIndexOperationBegin = new Semaphore(0);
|
||||
indexOperationFinished = new Semaphore(0);
|
||||
letIndexOperationReturn = new Semaphore(0);
|
||||
|
||||
deleteOperationReceived = new Semaphore(0);
|
||||
letDeleteOperationBegin = new Semaphore(0);
|
||||
deleteOperationFinished = new Semaphore(0);
|
||||
letDeleteOperationReturn = new Semaphore(0);
|
||||
}
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.update;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
||||
/**
|
||||
* Provides an implementation of {@link Engine} only intended for use with {@link TransportUpdateActionTest}.
|
||||
*/
|
||||
public class InternalEngineWithControllableTimingForTestingModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(Engine.class).to(InternalEngineWithControllableTimingForTesting.class).asEagerSingleton();
|
||||
}
|
||||
}
|
|
@ -1,227 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.update;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.IndexEngineModule;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Tests to demonstrate the bug discussed in issue #6355 and pull request #6724. The tests validate the fix for that bug
|
||||
* and similar situations.
|
||||
*/
|
||||
public class TransportUpdateActionTest extends ElasticsearchIntegrationTest {
|
||||
|
||||
private static final String TYPE_NAME = "some-type";
|
||||
private static final TimeValue WAIT_FOR_RESPONSE_TIMEOUT = TimeValue.timeValueSeconds(5);
|
||||
|
||||
private static InternalEngineWithControllableTimingForTesting internalEngine;
|
||||
|
||||
private final String indexName = "issue-6355-test-index-" + UUID.randomUUID();
|
||||
private final String documentId = UUID.randomUUID().toString();
|
||||
|
||||
/**
|
||||
* Creates an index which uses an implementation of {@link Engine} that allows the test to enforce specific timing
|
||||
* (and, if needed, exceptional behavior) for request handling.
|
||||
*/
|
||||
@Before
|
||||
public void createTestIndexAndInitTestEngine() {
|
||||
client().admin()
|
||||
.indices()
|
||||
.prepareCreate(indexName)
|
||||
.setSettings(
|
||||
ImmutableSettings
|
||||
.settingsBuilder()
|
||||
.put(IndexEngineModule.EngineSettings.ENGINE_TYPE,
|
||||
InternalEngineWithControllableTimingForTestingModule.class.getName()).put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0).build()).execute().actionGet();
|
||||
internalEngine = InternalEngineWithControllableTimingForTesting.currentTestInstance;
|
||||
}
|
||||
|
||||
@After
|
||||
public void resetTestEngine() {
|
||||
internalEngine.resetSemaphores();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void clearStaticFields() {
|
||||
internalEngine = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks correct behavior in case of an update request that does not include a retry_on_conflict parameter.
|
||||
*/
|
||||
@Test
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "ignore for now, is being worked on")
|
||||
public void shouldReceiveUpdateResponseWhenNotRetryingOnConflict() {
|
||||
sendAddRequestAndWaitUntilCompletelyDone(documentId);
|
||||
|
||||
ListenableActionFuture<UpdateResponse> updateFuture = executeUpdateAndDeleteRequestsWithIntendedTiming(documentId, 0);
|
||||
|
||||
verifyUpdateResponseCompletesExceptionally(updateFuture, VersionConflictEngineException.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks correct behavior in case of an update request that includes a retry_on_conflict parameter set to 1. This
|
||||
* test demonstrates the bug described in issue #6355 and validates the fix. The scenario is based on real client
|
||||
* requests that happen with a particular timing.
|
||||
*/
|
||||
@Test
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "ignore for now, is being worked on")
|
||||
public void shouldReceiveUpdateResponseWhenRetryingOnConflict() {
|
||||
sendAddRequestAndWaitUntilCompletelyDone(documentId);
|
||||
|
||||
ListenableActionFuture<UpdateResponse> updateFuture = executeUpdateAndDeleteRequestsWithIntendedTiming(documentId, 1);
|
||||
|
||||
verifyUpdateResponseCompletesExceptionally(updateFuture, DocumentMissingException.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks correct behavior in case of an upsert request that does not include a retry_on_conflict parameter.
|
||||
*/
|
||||
@Test
|
||||
public void shouldReceiveUpsertResponseWhenNotRetryingOnConflict() {
|
||||
ListenableActionFuture<UpdateResponse> upsertFuture = updateDocument(documentId, true, 0);
|
||||
|
||||
internalEngine.letNextGetThrowException();
|
||||
|
||||
verifyUpdateResponseCompletesExceptionally(upsertFuture, VersionConflictEngineException.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks correct behavior in case of an upsert request that includes a retry_on_conflict parameter set to 1. This
|
||||
* was not originally covered by issue #6355, but the discussion for pull request #6724 revealed that the same bug
|
||||
* may appear here as well. The scenario is based on a fake exception thrown when getting a document from the
|
||||
* {@link Engine}, because unlike for the "update" case it turns out to be quite difficult to set up real client
|
||||
* requests to trigger an exception in {@link UpdateHelper#prepare(UpdateRequest)} for the "upsert" case.
|
||||
*/
|
||||
@Test
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "ignore for now, is being worked on")
|
||||
public void shouldReceiveUpsertResponseWhenRetryingOnConflict() {
|
||||
// send an upsert request and make sure the corresponding create operation has arrived in the internal engine
|
||||
ListenableActionFuture<UpdateResponse> upsertFuture = updateDocument(documentId, true, 1);
|
||||
internalEngine.waitUntilCreateOperationReceived();
|
||||
|
||||
// let another request cause a version conflict, triggering a retry of the upsert request
|
||||
sendAddRequestAndWaitUntilCompletelyDone(documentId);
|
||||
|
||||
// now complete the update request, but make sure the retry is going to fail when getting the document
|
||||
internalEngine.letCreateOperationBegin();
|
||||
internalEngine.waitUntilCreateOperationFinished();
|
||||
|
||||
internalEngine.letNextGetThrowException();
|
||||
|
||||
internalEngine.letCreateOperationReturn();
|
||||
|
||||
verifyUpdateResponseCompletesExceptionally(upsertFuture, VersionConflictEngineException.class);
|
||||
}
|
||||
|
||||
private void verifyUpdateResponseCompletesExceptionally(ListenableActionFuture<UpdateResponse> updateFuture,
|
||||
Class<? extends Exception> expectedExceptionClass) {
|
||||
try {
|
||||
updateFuture.actionGet(WAIT_FOR_RESPONSE_TIMEOUT);
|
||||
fail("Future for update request did not complete exceptionally, but should.");
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
fail("Future for update request did not complete within " + WAIT_FOR_RESPONSE_TIMEOUT);
|
||||
} catch (Exception e) {
|
||||
assertTrue("Future for update request completed with an unexpected exception.", expectedExceptionClass.isInstance(e));
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableActionFuture<UpdateResponse> executeUpdateAndDeleteRequestsWithIntendedTiming(String documentId, int retryOnConflict) {
|
||||
// send an update request and make sure the corresponding index operation has arrived in the internal engine
|
||||
ListenableActionFuture<UpdateResponse> updateFuture = updateDocument(documentId, false, retryOnConflict);
|
||||
internalEngine.waitUntilIndexOperationReceived();
|
||||
|
||||
sendDeleteRequestAndWaitUntilCompletelyDone(documentId);
|
||||
|
||||
// now complete the index operation of the update request
|
||||
internalEngine.letIndexOperationBegin();
|
||||
internalEngine.waitUntilIndexOperationFinished();
|
||||
internalEngine.letIndexOperationReturn();
|
||||
|
||||
return updateFuture;
|
||||
}
|
||||
|
||||
private void sendAddRequestAndWaitUntilCompletelyDone(String documentId) {
|
||||
ListenableActionFuture<IndexResponse> addFuture = addDocument(documentId);
|
||||
|
||||
internalEngine.waitUntilIndexOperationReceived();
|
||||
internalEngine.letIndexOperationBegin();
|
||||
internalEngine.waitUntilIndexOperationFinished();
|
||||
internalEngine.letIndexOperationReturn();
|
||||
|
||||
waitUntilResponseReceived(addFuture);
|
||||
}
|
||||
|
||||
private void sendDeleteRequestAndWaitUntilCompletelyDone(String documentId) {
|
||||
ListenableActionFuture<DeleteResponse> deleteFuture = deleteDocument(documentId);
|
||||
|
||||
internalEngine.waitUntilDeleteOperationReceived();
|
||||
internalEngine.letDeleteOperationBegin();
|
||||
internalEngine.waitUntilDeleteOperationFinished();
|
||||
internalEngine.letDeleteOperationReturn();
|
||||
|
||||
waitUntilResponseReceived(deleteFuture);
|
||||
}
|
||||
|
||||
private void waitUntilResponseReceived(ListenableActionFuture<?> responseFuture) {
|
||||
responseFuture.actionGet(WAIT_FOR_RESPONSE_TIMEOUT);
|
||||
}
|
||||
|
||||
private ListenableActionFuture<IndexResponse> addDocument(String id) {
|
||||
return client().prepareIndex(indexName, TYPE_NAME, id).setSource(buildJson(id)).execute();
|
||||
}
|
||||
|
||||
private ListenableActionFuture<UpdateResponse> updateDocument(String id, boolean upsert, int retryOnConflict) {
|
||||
return client().prepareUpdate(indexName, TYPE_NAME, id).setDoc(buildJson(id)).setDocAsUpsert(upsert)
|
||||
.setRetryOnConflict(retryOnConflict).execute();
|
||||
}
|
||||
|
||||
private ListenableActionFuture<DeleteResponse> deleteDocument(String id) {
|
||||
return client().prepareDelete(indexName, TYPE_NAME, id).execute();
|
||||
}
|
||||
|
||||
private XContentBuilder buildJson(String id) {
|
||||
try {
|
||||
return XContentFactory.jsonBuilder().startObject().field("id", id).endObject();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Not going to happen!");
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue