diff --git a/src/test/java/org/elasticsearch/action/update/InternalEngineWithControllableTimingForTesting.java b/src/test/java/org/elasticsearch/action/update/InternalEngineWithControllableTimingForTesting.java deleted file mode 100644 index 7e2cdca5c94..00000000000 --- a/src/test/java/org/elasticsearch/action/update/InternalEngineWithControllableTimingForTesting.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.update; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.analysis.AnalysisService; -import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineException; -import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.engine.internal.InternalEngine; -import org.elasticsearch.index.indexing.ShardIndexingService; -import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.merge.policy.MergePolicyProvider; -import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; -import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.similarity.SimilarityService; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.warmer.IndicesWarmer; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * An implementation of {@link Engine} only intended for use with {@link TransportUpdateActionTest}. - */ -public class InternalEngineWithControllableTimingForTesting extends InternalEngine implements Engine { - - /* - * Not the best programming practice, but a simple way to make the instance accessible from test classes. The - * "cleaner" way - making the appropriate Guice injector of the respective index available to the test class is - * rather difficult and fragile, too. As long as tests requiring multiple instances of this class are not run in - * parallel, everything will be fine. Currently, there is just a single test suite that uses only a single instance - * anyway. - */ - public static InternalEngineWithControllableTimingForTesting currentTestInstance; - - private AtomicBoolean nextGetThrowsException = new AtomicBoolean(); - - private Semaphore createOperationReceived = new Semaphore(0); - private Semaphore letCreateOperationBegin = new Semaphore(0); - private Semaphore createOperationFinished = new Semaphore(0); - private Semaphore letCreateOperationReturn = new Semaphore(0); - - private Semaphore indexOperationReceived = new Semaphore(0); - private Semaphore letIndexOperationBegin = new Semaphore(0); - private Semaphore indexOperationFinished = new Semaphore(0); - private Semaphore letIndexOperationReturn = new Semaphore(0); - - private Semaphore deleteOperationReceived = new Semaphore(0); - private Semaphore letDeleteOperationBegin = new Semaphore(0); - private Semaphore deleteOperationFinished = new Semaphore(0); - private Semaphore letDeleteOperationReturn = new Semaphore(0); - - // safety timeout so that if something goes wrong the test does not block forever - private static final long SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS = 5; - - @Inject - public InternalEngineWithControllableTimingForTesting(ShardId shardId, Settings indexSettings, ThreadPool threadPool, - IndexSettingsService indexSettingsService, ShardIndexingService indexingService, IndicesWarmer warmer, Store store, - SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, - MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService, - CodecService codecService) throws EngineException { - super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store, deletionPolicy, translog, - mergePolicyProvider, mergeScheduler, analysisService, similarityService, codecService); - // 'this' escapes from the constructor, but for the purpose of this test it is fine. - currentTestInstance = this; - } - - @Override - public GetResult get(Get get) throws EngineException { - if (nextGetThrowsException.getAndSet(false)) { - Uid uid = Uid.createUid(get.uid().text()); - long dummyVersion = 1000L; - throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), dummyVersion, get.version()); - } - return super.get(get); - } - - private void acquireWithTimeout(Semaphore semaphore) { - try { - boolean acquired = semaphore.tryAcquire(SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (!acquired){ - throw new RuntimeException("(Integration test:) Cannot acquire semaphore within the specified timeout of " - + SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS + " seconds"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void create(Create create) throws EngineException { - createOperationReceived.release(); - acquireWithTimeout(letCreateOperationBegin); - try { - super.create(create); - } finally { - createOperationFinished.release(); - acquireWithTimeout(letCreateOperationReturn); - } - } - - @Override - public void index(Index index) throws EngineException { - indexOperationReceived.release(); - acquireWithTimeout(letIndexOperationBegin); - try { - super.index(index); - } finally { - indexOperationFinished.release(); - acquireWithTimeout(letIndexOperationReturn); - } - } - - @Override - public void delete(Delete delete) throws EngineException { - deleteOperationReceived.release(); - acquireWithTimeout(letDeleteOperationBegin); - try { - super.delete(delete); - } finally { - deleteOperationFinished.release(); - acquireWithTimeout(letDeleteOperationReturn); - } - } - - public void letNextGetThrowException() { - nextGetThrowsException.set(true); - } - - public void waitUntilCreateOperationReceived() { - acquireWithTimeout(createOperationReceived); - } - - public void letCreateOperationBegin() { - letCreateOperationBegin.release(); - } - - public void waitUntilCreateOperationFinished() { - acquireWithTimeout(createOperationFinished); - } - - public void letCreateOperationReturn() { - letCreateOperationReturn.release(); - } - - public void waitUntilIndexOperationReceived() { - acquireWithTimeout(indexOperationReceived); - } - - public void letIndexOperationBegin() { - letIndexOperationBegin.release(); - } - - public void waitUntilIndexOperationFinished() { - acquireWithTimeout(indexOperationFinished); - } - - public void letIndexOperationReturn() { - letIndexOperationReturn.release(); - } - - public void waitUntilDeleteOperationReceived() { - acquireWithTimeout(deleteOperationReceived); - } - - public void letDeleteOperationBegin() { - letDeleteOperationBegin.release(); - } - - public void waitUntilDeleteOperationFinished() { - acquireWithTimeout(deleteOperationFinished); - } - - public void letDeleteOperationReturn() { - letDeleteOperationReturn.release(); - } - - public void resetSemaphores() { - nextGetThrowsException = new AtomicBoolean(); - - createOperationReceived = new Semaphore(0); - letCreateOperationBegin = new Semaphore(0); - createOperationFinished = new Semaphore(0); - letCreateOperationReturn = new Semaphore(0); - - indexOperationReceived = new Semaphore(0); - letIndexOperationBegin = new Semaphore(0); - indexOperationFinished = new Semaphore(0); - letIndexOperationReturn = new Semaphore(0); - - deleteOperationReceived = new Semaphore(0); - letDeleteOperationBegin = new Semaphore(0); - deleteOperationFinished = new Semaphore(0); - letDeleteOperationReturn = new Semaphore(0); - } -} diff --git a/src/test/java/org/elasticsearch/action/update/InternalEngineWithControllableTimingForTestingModule.java b/src/test/java/org/elasticsearch/action/update/InternalEngineWithControllableTimingForTestingModule.java deleted file mode 100644 index 075ec808b01..00000000000 --- a/src/test/java/org/elasticsearch/action/update/InternalEngineWithControllableTimingForTestingModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.update; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.index.engine.Engine; - -/** - * Provides an implementation of {@link Engine} only intended for use with {@link TransportUpdateActionTest}. - */ -public class InternalEngineWithControllableTimingForTestingModule extends AbstractModule { - - @Override - protected void configure() { - bind(Engine.class).to(InternalEngineWithControllableTimingForTesting.class).asEagerSingleton(); - } -} diff --git a/src/test/java/org/elasticsearch/action/update/TransportUpdateActionTest.java b/src/test/java/org/elasticsearch/action/update/TransportUpdateActionTest.java deleted file mode 100644 index dec1b82163a..00000000000 --- a/src/test/java/org/elasticsearch/action/update/TransportUpdateActionTest.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.update; - -import org.apache.lucene.util.LuceneTestCase; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.engine.DocumentMissingException; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.IndexEngineModule; -import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.UUID; - -/** - * Tests to demonstrate the bug discussed in issue #6355 and pull request #6724. The tests validate the fix for that bug - * and similar situations. - */ -public class TransportUpdateActionTest extends ElasticsearchIntegrationTest { - - private static final String TYPE_NAME = "some-type"; - private static final TimeValue WAIT_FOR_RESPONSE_TIMEOUT = TimeValue.timeValueSeconds(5); - - private static InternalEngineWithControllableTimingForTesting internalEngine; - - private final String indexName = "issue-6355-test-index-" + UUID.randomUUID(); - private final String documentId = UUID.randomUUID().toString(); - - /** - * Creates an index which uses an implementation of {@link Engine} that allows the test to enforce specific timing - * (and, if needed, exceptional behavior) for request handling. - */ - @Before - public void createTestIndexAndInitTestEngine() { - client().admin() - .indices() - .prepareCreate(indexName) - .setSettings( - ImmutableSettings - .settingsBuilder() - .put(IndexEngineModule.EngineSettings.ENGINE_TYPE, - InternalEngineWithControllableTimingForTestingModule.class.getName()).put("number_of_shards", 1) - .put("number_of_replicas", 0).build()).execute().actionGet(); - internalEngine = InternalEngineWithControllableTimingForTesting.currentTestInstance; - } - - @After - public void resetTestEngine() { - internalEngine.resetSemaphores(); - } - - @AfterClass - public static void clearStaticFields() { - internalEngine = null; - } - - /** - * Checks correct behavior in case of an update request that does not include a retry_on_conflict parameter. - */ - @Test - @LuceneTestCase.AwaitsFix(bugUrl = "ignore for now, is being worked on") - public void shouldReceiveUpdateResponseWhenNotRetryingOnConflict() { - sendAddRequestAndWaitUntilCompletelyDone(documentId); - - ListenableActionFuture updateFuture = executeUpdateAndDeleteRequestsWithIntendedTiming(documentId, 0); - - verifyUpdateResponseCompletesExceptionally(updateFuture, VersionConflictEngineException.class); - } - - /** - * Checks correct behavior in case of an update request that includes a retry_on_conflict parameter set to 1. This - * test demonstrates the bug described in issue #6355 and validates the fix. The scenario is based on real client - * requests that happen with a particular timing. - */ - @Test - @LuceneTestCase.AwaitsFix(bugUrl = "ignore for now, is being worked on") - public void shouldReceiveUpdateResponseWhenRetryingOnConflict() { - sendAddRequestAndWaitUntilCompletelyDone(documentId); - - ListenableActionFuture updateFuture = executeUpdateAndDeleteRequestsWithIntendedTiming(documentId, 1); - - verifyUpdateResponseCompletesExceptionally(updateFuture, DocumentMissingException.class); - } - - /** - * Checks correct behavior in case of an upsert request that does not include a retry_on_conflict parameter. - */ - @Test - public void shouldReceiveUpsertResponseWhenNotRetryingOnConflict() { - ListenableActionFuture upsertFuture = updateDocument(documentId, true, 0); - - internalEngine.letNextGetThrowException(); - - verifyUpdateResponseCompletesExceptionally(upsertFuture, VersionConflictEngineException.class); - } - - /** - * Checks correct behavior in case of an upsert request that includes a retry_on_conflict parameter set to 1. This - * was not originally covered by issue #6355, but the discussion for pull request #6724 revealed that the same bug - * may appear here as well. The scenario is based on a fake exception thrown when getting a document from the - * {@link Engine}, because unlike for the "update" case it turns out to be quite difficult to set up real client - * requests to trigger an exception in {@link UpdateHelper#prepare(UpdateRequest)} for the "upsert" case. - */ - @Test - @LuceneTestCase.AwaitsFix(bugUrl = "ignore for now, is being worked on") - public void shouldReceiveUpsertResponseWhenRetryingOnConflict() { - // send an upsert request and make sure the corresponding create operation has arrived in the internal engine - ListenableActionFuture upsertFuture = updateDocument(documentId, true, 1); - internalEngine.waitUntilCreateOperationReceived(); - - // let another request cause a version conflict, triggering a retry of the upsert request - sendAddRequestAndWaitUntilCompletelyDone(documentId); - - // now complete the update request, but make sure the retry is going to fail when getting the document - internalEngine.letCreateOperationBegin(); - internalEngine.waitUntilCreateOperationFinished(); - - internalEngine.letNextGetThrowException(); - - internalEngine.letCreateOperationReturn(); - - verifyUpdateResponseCompletesExceptionally(upsertFuture, VersionConflictEngineException.class); - } - - private void verifyUpdateResponseCompletesExceptionally(ListenableActionFuture updateFuture, - Class expectedExceptionClass) { - try { - updateFuture.actionGet(WAIT_FOR_RESPONSE_TIMEOUT); - fail("Future for update request did not complete exceptionally, but should."); - } catch (ElasticsearchTimeoutException e) { - fail("Future for update request did not complete within " + WAIT_FOR_RESPONSE_TIMEOUT); - } catch (Exception e) { - assertTrue("Future for update request completed with an unexpected exception.", expectedExceptionClass.isInstance(e)); - } - } - - private ListenableActionFuture executeUpdateAndDeleteRequestsWithIntendedTiming(String documentId, int retryOnConflict) { - // send an update request and make sure the corresponding index operation has arrived in the internal engine - ListenableActionFuture updateFuture = updateDocument(documentId, false, retryOnConflict); - internalEngine.waitUntilIndexOperationReceived(); - - sendDeleteRequestAndWaitUntilCompletelyDone(documentId); - - // now complete the index operation of the update request - internalEngine.letIndexOperationBegin(); - internalEngine.waitUntilIndexOperationFinished(); - internalEngine.letIndexOperationReturn(); - - return updateFuture; - } - - private void sendAddRequestAndWaitUntilCompletelyDone(String documentId) { - ListenableActionFuture addFuture = addDocument(documentId); - - internalEngine.waitUntilIndexOperationReceived(); - internalEngine.letIndexOperationBegin(); - internalEngine.waitUntilIndexOperationFinished(); - internalEngine.letIndexOperationReturn(); - - waitUntilResponseReceived(addFuture); - } - - private void sendDeleteRequestAndWaitUntilCompletelyDone(String documentId) { - ListenableActionFuture deleteFuture = deleteDocument(documentId); - - internalEngine.waitUntilDeleteOperationReceived(); - internalEngine.letDeleteOperationBegin(); - internalEngine.waitUntilDeleteOperationFinished(); - internalEngine.letDeleteOperationReturn(); - - waitUntilResponseReceived(deleteFuture); - } - - private void waitUntilResponseReceived(ListenableActionFuture responseFuture) { - responseFuture.actionGet(WAIT_FOR_RESPONSE_TIMEOUT); - } - - private ListenableActionFuture addDocument(String id) { - return client().prepareIndex(indexName, TYPE_NAME, id).setSource(buildJson(id)).execute(); - } - - private ListenableActionFuture updateDocument(String id, boolean upsert, int retryOnConflict) { - return client().prepareUpdate(indexName, TYPE_NAME, id).setDoc(buildJson(id)).setDocAsUpsert(upsert) - .setRetryOnConflict(retryOnConflict).execute(); - } - - private ListenableActionFuture deleteDocument(String id) { - return client().prepareDelete(indexName, TYPE_NAME, id).execute(); - } - - private XContentBuilder buildJson(String id) { - try { - return XContentFactory.jsonBuilder().startObject().field("id", id).endObject(); - } catch (IOException e) { - throw new RuntimeException("Not going to happen!"); - } - } -}