diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy index fd2b81f14ce..e58a87238c5 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy @@ -657,11 +657,11 @@ class ClusterFormationTasks { standardOutput = new ByteArrayOutputStream() doLast { String out = standardOutput.toString() - if (out.contains("${pid} org.elasticsearch.bootstrap.Elasticsearch") == false) { + if (out.contains("${ext.pid} org.elasticsearch.bootstrap.Elasticsearch") == false) { logger.error('jps -l') logger.error(out) - logger.error("pid file: ${pidFile}") - logger.error("pid: ${pid}") + logger.error("pid file: ${node.pidFile}") + logger.error("pid: ${ext.pid}") throw new GradleException("jps -l did not report any process with org.elasticsearch.bootstrap.Elasticsearch\n" + "Did you run gradle clean? Maybe an old pid file is still lying around.") } else { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 8db50535cc8..4abfd713bcd 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -55,7 +55,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; @@ -63,16 +62,6 @@ import static java.util.Collections.singletonMap; public class CrudIT extends ESRestHighLevelClientTestCase { public void testDelete() throws IOException { - { - // Testing non existing document - String docId = "does_not_exist"; - DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId); - DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync); - assertEquals("index", deleteResponse.getIndex()); - assertEquals("type", deleteResponse.getType()); - assertEquals(docId, deleteResponse.getId()); - assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult()); - } { // Testing deletion String docId = "id"; @@ -87,6 +76,16 @@ public class CrudIT extends ESRestHighLevelClientTestCase { assertEquals(docId, deleteResponse.getId()); assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); } + { + // Testing non existing document + String docId = "does_not_exist"; + DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId); + DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync); + assertEquals("index", deleteResponse.getIndex()); + assertEquals("type", deleteResponse.getType()); + assertEquals(docId, deleteResponse.getId()); + assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult()); + } { // Testing version conflict String docId = "version_conflict"; diff --git a/core/src/main/java/org/elasticsearch/Version.java b/core/src/main/java/org/elasticsearch/Version.java index 1e60a4ac83c..6f5afa926d3 100644 --- a/core/src/main/java/org/elasticsearch/Version.java +++ b/core/src/main/java/org/elasticsearch/Version.java @@ -72,6 +72,8 @@ public class Version implements Comparable { public static final Version V_5_3_1 = new Version(V_5_3_1_ID, org.apache.lucene.util.Version.LUCENE_6_4_2); public static final int V_5_3_2_ID = 5030299; public static final Version V_5_3_2 = new Version(V_5_3_2_ID, org.apache.lucene.util.Version.LUCENE_6_4_2); + public static final int V_5_3_3_ID_UNRELEASED = 5030399; + public static final Version V_5_3_3_UNRELEASED = new Version(V_5_3_3_ID_UNRELEASED, org.apache.lucene.util.Version.LUCENE_6_4_2); public static final int V_5_4_0_ID = 5040099; public static final Version V_5_4_0 = new Version(V_5_4_0_ID, org.apache.lucene.util.Version.LUCENE_6_5_0); public static final int V_5_5_0_ID_UNRELEASED = 5050099; @@ -105,6 +107,8 @@ public class Version implements Comparable { return V_5_5_0_UNRELEASED; case V_5_4_0_ID: return V_5_4_0; + case V_5_3_3_ID_UNRELEASED: + return V_5_3_3_UNRELEASED; case V_5_3_2_ID: return V_5_3_2; case V_5_3_1_ID: diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 16c22524e2d..ebf50e770bc 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -54,6 +54,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.ingest.IngestService; @@ -144,6 +145,11 @@ public class TransportBulkAction extends HandledTransportAction indices = bulkRequest.requests.stream() + // delete requests should not attempt to create the index (if the index does not + // exists), unless an external versioning is used + .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE + || request.versionType() == VersionType.EXTERNAL + || request.versionType() == VersionType.EXTERNAL_GTE) .map(DocWriteRequest::index) .collect(Collectors.toSet()); /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index a8ee1677bbd..b3dcf6fe1ef 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -52,7 +51,6 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -179,8 +177,8 @@ public abstract class TransportReplicationAction< Request shardRequest, IndexShard primary) throws Exception; /** - * Synchronous replica operation on nodes with replica copies. This is done under the lock form - * {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)} + * Synchronously execute the specified replica operation. This is done under a permit from + * {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on @@ -584,7 +582,7 @@ public abstract class TransportReplicationAction< throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, actualAllocationId); } - replica.acquireReplicaOperationLock(request.primaryTerm, this, executor); + replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor); } /** @@ -921,7 +919,7 @@ public abstract class TransportReplicationAction< } }; - indexShard.acquirePrimaryOperationLock(onAcquired, executor); + indexShard.acquirePrimaryOperationPermit(onAcquired, executor); } class ShardReference implements Releasable { diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index cf41ad8ec1d..a4ab30116f7 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -612,11 +612,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust rescheduleFsyncTask(durability); } } - - // update primary terms - for (final IndexShard shard : this.shards.values()) { - shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id())); - } } private void rescheduleFsyncTask(Translog.Durability durability) { diff --git a/core/src/main/java/org/elasticsearch/index/query/QueryShardContext.java b/core/src/main/java/org/elasticsearch/index/query/QueryShardContext.java index ce6439ae1be..31b2a71846c 100644 --- a/core/src/main/java/org/elasticsearch/index/query/QueryShardContext.java +++ b/core/src/main/java/org/elasticsearch/index/query/QueryShardContext.java @@ -333,7 +333,8 @@ public class QueryShardContext extends QueryRewriteContext { */ public final SearchScript getSearchScript(Script script, ScriptContext context) { failIfFrozen(); - return scriptService.search(lookup(), script, context); + CompiledScript compile = scriptService.compile(script, context); + return scriptService.search(lookup(), compile, script.getParams()); } /** * Returns a lazily created {@link SearchScript} that is compiled immediately but can be pulled later once all diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 39a4d99ac46..6e5af2b2723 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -50,7 +50,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -129,6 +128,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -195,7 +195,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final ShardPath path; - private final IndexShardOperationsLock indexShardOperationsLock; + private final IndexShardOperationPermits indexShardOperationPermits; private static final EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); // for primaries, we only allow to write when actually started (so the cluster has decided we started) @@ -272,7 +272,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } this.cachingPolicy = cachingPolicy; } - indexShardOperationsLock = new IndexShardOperationsLock(shardId, logger, threadPool); + indexShardOperationPermits = new IndexShardOperationPermits(shardId, logger, threadPool); searcherWrapper = indexSearcherWrapper; primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); refreshListeners = buildRefreshListeners(); @@ -328,7 +328,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return this.shardFieldData; } - /** * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} */ @@ -340,6 +339,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * notifies the shard of an increase in the primary term */ public void updatePrimaryTerm(final long newTerm) { + assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; synchronized (mutex) { if (newTerm != primaryTerm) { // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned @@ -354,10 +354,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // // We could fail the shard in that case, but this will cause it to be removed from the insync allocations list // potentially preventing re-allocation. - assert shardRouting.primary() == false || shardRouting.initializing() == false : - "a started primary shard should never update it's term. shard: " + shardRouting - + " current term [" + primaryTerm + "] new term [" + newTerm + "]"; - assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]"; + assert shardRouting.initializing() == false : + "a started primary shard should never update its term; " + + "shard " + shardRouting + ", " + + "current term [" + primaryTerm + "], " + + "new term [" + newTerm + "]"; + assert newTerm > primaryTerm : + "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]"; primaryTerm = newTerm; } } @@ -457,9 +460,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { - indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> { - // no shard operation locks are being held here, move state from started to relocated - assert indexShardOperationsLock.getActiveOperationsCount() == 0 : + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + // no shard operation permits are being held here, move state from started to relocated + assert indexShardOperationPermits.getActiveOperationsCount() == 0 : "in-flight operations in progress while moving shard state to relocated"; synchronized (mutex) { if (state != IndexShardState.STARTED) { @@ -974,7 +977,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners IOUtils.close(engine, refreshListeners); - indexShardOperationsLock.close(); + indexShardOperationPermits.close(); } } } @@ -1841,35 +1844,81 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Acquire a primary operation lock whenever the shard is ready for indexing. If the lock is directly available, the provided - * ActionListener will be called on the calling thread. During relocation hand-off, lock acquisition can be delayed. The provided + * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided + * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided * ActionListener will then be called using the provided executor. */ - public void acquirePrimaryOperationLock(ActionListener onLockAcquired, String executorOnDelay) { + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay) { verifyNotClosed(); verifyPrimary(); - indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, false); + indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false); } + private final Object primaryTermMutex = new Object(); + /** - * Acquire a replica operation lock whenever the shard is ready for indexing (see acquirePrimaryOperationLock). If the given primary - * term is lower then the one in {@link #shardRouting} an {@link IllegalArgumentException} is thrown. + * Acquire a replica operation permit whenever the shard is ready for indexing (see + * {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified + * name. + * + * @param operationPrimaryTerm the operation primary term + * @param onPermitAcquired the listener for permit acquisition + * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed */ - public void acquireReplicaOperationLock(long opPrimaryTerm, ActionListener onLockAcquired, String executorOnDelay) { + public void acquireReplicaOperationPermit( + final long operationPrimaryTerm, final ActionListener onPermitAcquired, final String executorOnDelay) { verifyNotClosed(); verifyReplicationTarget(); - if (primaryTerm > opPrimaryTerm) { - // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException - throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", - shardId, opPrimaryTerm, primaryTerm)); + if (operationPrimaryTerm > primaryTerm) { + synchronized (primaryTermMutex) { + if (operationPrimaryTerm > primaryTerm) { + try { + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + assert operationPrimaryTerm > primaryTerm; + primaryTerm = operationPrimaryTerm; + }); + } catch (final InterruptedException | TimeoutException e) { + onPermitAcquired.onFailure(e); + return; + } + } + } } - indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, true); + assert operationPrimaryTerm <= primaryTerm + : "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]"; + indexShardOperationPermits.acquire( + new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + if (operationPrimaryTerm < primaryTerm) { + releasable.close(); + final String message = String.format( + Locale.ROOT, + "%s operation primary term [%d] is too old (current [%d])", + shardId, + operationPrimaryTerm, + primaryTerm); + onPermitAcquired.onFailure(new IllegalStateException(message)); + } else { + onPermitAcquired.onResponse(releasable); + } + } + + @Override + public void onFailure(final Exception e) { + onPermitAcquired.onFailure(e); + } + }, + executorOnDelay, + true); } public int getActiveOperationsCount() { - return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close + return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close } private final AsyncIOProcessor translogSyncProcessor = new AsyncIOProcessor(logger, 1024) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsLock.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java similarity index 86% rename from core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsLock.java rename to core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 70e2037664f..016067259c1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsLock.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; @@ -36,7 +37,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; -public class IndexShardOperationsLock implements Closeable { +final class IndexShardOperationPermits implements Closeable { private final ShardId shardId; private final Logger logger; private final ThreadPool threadPool; @@ -44,10 +45,10 @@ public class IndexShardOperationsLock implements Closeable { private static final int TOTAL_PERMITS = Integer.MAX_VALUE; // fair semaphore to ensure that blockOperations() does not starve under thread contention final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); - @Nullable private List> delayedOperations; // operations that are delayed due to relocation hand-off + @Nullable private List> delayedOperations; // operations that are delayed private volatile boolean closed; - public IndexShardOperationsLock(ShardId shardId, Logger logger, ThreadPool threadPool) { + IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) { this.shardId = shardId; this.logger = logger; this.threadPool = threadPool; @@ -67,7 +68,7 @@ public class IndexShardOperationsLock implements Closeable { * @param onBlocked the action to run once the block has been acquired * @throws InterruptedException if calling thread is interrupted * @throws TimeoutException if timed out waiting for in-flight operations to finish - * @throws IndexShardClosedException if operation lock has been closed + * @throws IndexShardClosedException if operation permit has been closed */ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException { if (closed) { @@ -75,6 +76,7 @@ public class IndexShardOperationsLock implements Closeable { } try { if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { + assert semaphore.availablePermits() == 0; try { onBlocked.run(); } finally { @@ -91,7 +93,7 @@ public class IndexShardOperationsLock implements Closeable { } if (queuedActions != null) { // Try acquiring permits on fresh thread (for two reasons): - // - blockOperations is called on recovery thread which can be expected to be interrupted when recovery is cancelled. + // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled. // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by // ThreadedActionListener if the queue of the thread pool on which it submits is full. // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure @@ -106,14 +108,14 @@ public class IndexShardOperationsLock implements Closeable { } /** - * Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided - * ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock - * acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations - * terminates. + * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided + * {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, + * permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no + * longer blocked. * - * @param onAcquired ActionListener that is invoked once acquisition is successful or failed + * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for delayed call - * @param forceExecution whether the runnable should force its execution in case it gets rejected + * @param forceExecution whether the runnable should force its execution in case it gets rejected */ public void acquire(ActionListener onAcquired, String executorOnDelay, boolean forceExecution) { if (closed) { diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 7371bde8939..147a952507a 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -561,6 +561,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); final Set initializingIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); + shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id())); shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } } catch (Exception e) { @@ -737,6 +738,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple */ void updateRoutingEntry(ShardRouting shardRouting) throws IOException; + /** + * Update the primary term. This method should only be invoked on primary shards. + * + * @param primaryTerm the new primary term + */ + void updatePrimaryTerm(long primaryTerm); + /** * Notifies the service of the current allocation ids in the cluster state. * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. diff --git a/core/src/main/java/org/elasticsearch/script/ScriptEngine.java b/core/src/main/java/org/elasticsearch/script/ScriptEngine.java index e9c92ddd419..caec8320082 100644 --- a/core/src/main/java/org/elasticsearch/script/ScriptEngine.java +++ b/core/src/main/java/org/elasticsearch/script/ScriptEngine.java @@ -48,11 +48,4 @@ public interface ScriptEngine extends Closeable { ExecutableScript executable(CompiledScript compiledScript, @Nullable Map vars); SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map vars); - - /** - * Returns true if this scripting engine can safely accept inline scripts by default. The default is false - */ - default boolean isInlineScriptEnabled() { - return false; - } } diff --git a/core/src/main/java/org/elasticsearch/script/ScriptEngineRegistry.java b/core/src/main/java/org/elasticsearch/script/ScriptEngineRegistry.java deleted file mode 100644 index 4881d45f6c8..00000000000 --- a/core/src/main/java/org/elasticsearch/script/ScriptEngineRegistry.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.script; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -public class ScriptEngineRegistry { - - private final Map, String> registeredScriptEngineServices; - private final Map registeredLanguages; - private final Map defaultInlineScriptEnableds; - - public ScriptEngineRegistry(Iterable registrations) { - Objects.requireNonNull(registrations); - Map, String> registeredScriptEngineServices = new HashMap<>(); - Map registeredLanguages = new HashMap<>(); - Map inlineScriptEnableds = new HashMap<>(); - for (ScriptEngine service : registrations) { - String oldLanguage = registeredScriptEngineServices.putIfAbsent(service.getClass(), - service.getType()); - if (oldLanguage != null) { - throw new IllegalArgumentException("script engine service [" + service.getClass() + - "] already registered for language [" + oldLanguage + "]"); - } - String language = service.getType(); - ScriptEngine scriptEngine = - registeredLanguages.putIfAbsent(language, service); - if (scriptEngine != null) { - throw new IllegalArgumentException("scripting language [" + language + "] already registered for script engine service [" + - scriptEngine.getClass().getCanonicalName() + "]"); - } - inlineScriptEnableds.put(language, service.isInlineScriptEnabled()); - } - - this.registeredScriptEngineServices = Collections.unmodifiableMap(registeredScriptEngineServices); - this.registeredLanguages = Collections.unmodifiableMap(registeredLanguages); - this.defaultInlineScriptEnableds = Collections.unmodifiableMap(inlineScriptEnableds); - } - - Iterable> getRegisteredScriptEngineServices() { - return registeredScriptEngineServices.keySet(); - } - - String getLanguage(Class scriptEngineService) { - Objects.requireNonNull(scriptEngineService); - return registeredScriptEngineServices.get(scriptEngineService); - } - - public Map getRegisteredLanguages() { - return registeredLanguages; - } - - public Map getDefaultInlineScriptEnableds() { - return this.defaultInlineScriptEnableds; - } - -} diff --git a/core/src/main/java/org/elasticsearch/script/ScriptModule.java b/core/src/main/java/org/elasticsearch/script/ScriptModule.java index 1d7ecf119ec..7668b38c4cd 100644 --- a/core/src/main/java/org/elasticsearch/script/ScriptModule.java +++ b/core/src/main/java/org/elasticsearch/script/ScriptModule.java @@ -24,7 +24,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.ScriptPlugin; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -52,14 +55,26 @@ public class ScriptModule { public ScriptModule(Settings settings, List scriptEngines, List customScriptContexts) { ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customScriptContexts); - ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(scriptEngines); + Map enginesByName = getEnginesByName(scriptEngines); try { - scriptService = new ScriptService(settings, scriptEngineRegistry, scriptContextRegistry); + scriptService = new ScriptService(settings, enginesByName, scriptContextRegistry); } catch (IOException e) { throw new RuntimeException("Couldn't setup ScriptService", e); } } + private Map getEnginesByName(List engines) { + Map enginesByName = new HashMap<>(); + for (ScriptEngine engine : engines) { + ScriptEngine existing = enginesByName.put(engine.getType(), engine); + if (existing != null) { + throw new IllegalArgumentException("scripting language [" + engine.getType() + "] defined for engine [" + + existing.getClass().getName() + "] and [" + engine.getClass().getName()); + } + } + return Collections.unmodifiableMap(enginesByName); + } + /** * Service responsible for managing scripts. */ diff --git a/core/src/main/java/org/elasticsearch/script/ScriptService.java b/core/src/main/java/org/elasticsearch/script/ScriptService.java index 712fa5d3eaa..b681d07e983 100644 --- a/core/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/core/src/main/java/org/elasticsearch/script/ScriptService.java @@ -96,11 +96,11 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust private double scriptsPerMinCounter; private double compilesAllowedPerNano; - public ScriptService(Settings settings, ScriptEngineRegistry scriptEngineRegistry, ScriptContextRegistry scriptContextRegistry) throws IOException { + public ScriptService(Settings settings, Map engines, ScriptContextRegistry scriptContextRegistry) throws IOException { super(settings); Objects.requireNonNull(settings); - Objects.requireNonNull(scriptEngineRegistry); + this.engines = Objects.requireNonNull(engines); Objects.requireNonNull(scriptContextRegistry); if (Strings.hasLength(settings.get(DISABLE_DYNAMIC_SCRIPTING_SETTING))) { @@ -192,8 +192,6 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust logger.debug("using script cache with max_size [{}], expire [{}]", cacheMaxSize, cacheExpire); this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build(); - this.engines = scriptEngineRegistry.getRegisteredLanguages(); - this.lastInlineCompileTime = System.nanoTime(); this.setMaxCompilationsPerMinute(SCRIPT_MAX_COMPILATIONS_PER_MINUTE.get(settings)); } @@ -510,14 +508,6 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust return getEngine(compiledScript.lang()).executable(compiledScript, params); } - /** - * Compiles (or retrieves from cache) and executes the provided search script - */ - public SearchScript search(SearchLookup lookup, Script script, ScriptContext scriptContext) { - CompiledScript compiledScript = compile(script, scriptContext); - return search(lookup, compiledScript, script.getParams()); - } - /** * Binds provided parameters to a compiled script returning a * {@link SearchScript} ready for execution diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 4174da37243..b6ac31c1d06 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.script.CompiledScript; import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.SearchScript; @@ -685,7 +686,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } if (source.scriptFields() != null) { for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) { - SearchScript searchScript = scriptService.search(context.lookup(), field.script(), ScriptContext.Standard.SEARCH); + CompiledScript compile = scriptService.compile(field.script(), ScriptContext.Standard.SEARCH); + SearchScript searchScript = scriptService.search(context.lookup(), compile, field.script().getParams()); context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure())); } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index a02939ad206..5ab9e1ea535 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.VersionType; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.TransportService; @@ -66,7 +67,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(new IndexRequest("no")); bulkRequest.add(new IndexRequest("can't")); - bulkRequest.add(new DeleteRequest("do")); + bulkRequest.add(new DeleteRequest("do").version(0).versionType(VersionType.EXTERNAL)); bulkRequest.add(new UpdateRequest("nothin", randomAlphaOfLength(5), randomAlphaOfLength(5))); indicesThatCannotBeCreatedTestCase(new HashSet<>(Arrays.asList("no", "can't", "do", "nothin")), bulkRequest, index -> { throw new IndexNotFoundException("Can't make it because I say so"); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java new file mode 100644 index 00000000000..50fb348834f --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -0,0 +1,135 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; + +public class TransportBulkActionTests extends ESTestCase { + + /** Services needed by bulk action */ + private TransportService transportService; + private ClusterService clusterService; + private ThreadPool threadPool; + + private TestTransportBulkAction bulkAction; + + class TestTransportBulkAction extends TransportBulkAction { + + boolean indexCreated = false; // set when the "real" index is created + + TestTransportBulkAction() { + super(Settings.EMPTY, TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, + null, new ActionFilters(Collections.emptySet()), new Resolver(Settings.EMPTY), + new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(Settings.EMPTY))); + } + + @Override + protected boolean needToCheck() { + return true; + } + + @Override + void createIndex(String index, TimeValue timeout, ActionListener listener) { + indexCreated = true; + listener.onResponse(null); + } + } + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("TransportBulkActionTookTests"); + clusterService = createClusterService(threadPool); + CapturingTransport capturingTransport = new CapturingTransport(); + transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), null); + transportService.start(); + transportService.acceptIncomingRequests(); + bulkAction = new TestTransportBulkAction(); + } + + @After + public void tearDown() throws Exception { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + clusterService.close(); + super.tearDown(); + } + + public void testDeleteNonExistingDocDoesNotCreateIndex() throws Exception { + BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest("index", "type", "id")); + + bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> { + assertFalse(bulkAction.indexCreated); + BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems(); + assertEquals(bulkResponses.length, 1); + assertTrue(bulkResponses[0].isFailed()); + assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException); + assertEquals("index", bulkResponses[0].getFailure().getIndex()); + }, exception -> { + throw new AssertionError(exception); + })); + } + + public void testDeleteNonExistingDocExternalVersionCreatesIndex() throws Exception { + BulkRequest bulkRequest = new BulkRequest() + .add(new DeleteRequest("index", "type", "id").versionType(VersionType.EXTERNAL).version(0)); + + bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> { + assertTrue(bulkAction.indexCreated); + }, exception -> { + throw new AssertionError(exception); + })); + } + + public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exception { + BulkRequest bulkRequest = new BulkRequest() + .add(new DeleteRequest("index2", "type", "id").versionType(VersionType.EXTERNAL_GTE).version(0)); + + bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> { + assertTrue(bulkAction.indexCreated); + }, exception -> { + throw new AssertionError(exception); + })); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0f8071cce36..89026d9d1db 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1091,7 +1091,7 @@ public class TransportReplicationActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[1]; @@ -1103,7 +1103,7 @@ public class TransportReplicationActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index ca16e816b47..f0690ad67b5 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -444,7 +444,7 @@ public class TransportWriteActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[1]; @@ -456,7 +456,7 @@ public class TransportWriteActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index a3bb96a72f2..38a94b7a107 100644 --- a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -43,12 +43,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContextRegistry; -import org.elasticsearch.script.ScriptEngineRegistry; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.RandomObjects; -import org.elasticsearch.watcher.ResourceWatcherService; import org.junit.Before; import java.io.IOException; @@ -60,7 +58,6 @@ import java.util.function.Function; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonList; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; import static org.elasticsearch.script.MockScriptEngine.mockInlineScript; @@ -140,14 +137,10 @@ public class UpdateRequestTests extends ESTestCase { scripts.put("return", vars -> null); final ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(emptyList()); final MockScriptEngine engine = new MockScriptEngine("mock", scripts); - final ScriptEngineRegistry scriptEngineRegistry = - new ScriptEngineRegistry(singletonList(engine)); - final ResourceWatcherService watcherService = - new ResourceWatcherService(baseSettings, null); ScriptService scriptService = new ScriptService( baseSettings, - scriptEngineRegistry, + Collections.singletonMap(engine.getType(), engine), scriptContextRegistry); final Settings settings = settings(Version.CURRENT).build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index 7e05448bf91..01c335e6ec4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -18,9 +18,12 @@ */ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.Murmur3HashFunction; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -48,10 +51,23 @@ public class ShardStateIT extends ESIntegTestCase { indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null); logger.info("--> waiting for a yellow index"); - // JDK 9 type inference gets confused, so we have to help the - // type inference - assertBusy(((Runnable) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), - equalTo(ClusterHealthStatus.YELLOW)))); + ensureYellow(); + + // this forces the primary term to propagate to the replicas + int id = 0; + while (true) { + // find an ID that routes to the right shard, we will only index to the shard that saw a primary failure + final String idAsString = Integer.toString(id); + final int hash = Math.floorMod(Murmur3HashFunction.hash(idAsString), 2); + if (hash == shard) { + client() + .index(new IndexRequest("test", "type", idAsString).source("{ \"f\": \"" + idAsString + "\"}", XContentType.JSON)) + .get(); + break; + } else { + id++; + } + } final long term0 = shard == 0 ? 2 : 1; final long term1 = shard == 1 ? 2 : 1; @@ -63,13 +79,13 @@ public class ShardStateIT extends ESIntegTestCase { assertPrimaryTerms(term0, term1); } - protected void assertPrimaryTerms(long term0, long term1) { + protected void assertPrimaryTerms(long shard0Term, long shard1Term) { for (String node : internalCluster().getNodeNames()) { logger.debug("--> asserting primary terms terms on [{}]", node); ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); IndexMetaData metaData = state.metaData().index("test"); - assertThat(metaData.primaryTerm(0), equalTo(term0)); - assertThat(metaData.primaryTerm(1), equalTo(term1)); + assertThat(metaData.primaryTerm(0), equalTo(shard0Term)); + assertThat(metaData.primaryTerm(1), equalTo(shard1Term)); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); IndexService indexService = indicesService.indexService(metaData.getIndex()); if (indexService != null) { diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 67f3a17c510..43f7be877fb 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -68,7 +68,6 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedInd import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.script.ScriptContextRegistry; -import org.elasticsearch.script.ScriptEngineRegistry; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ClusterServiceUtils; @@ -78,14 +77,12 @@ import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.watcher.ResourceWatcherService; import java.io.IOException; import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; public class IndexModuleTests extends ESTestCase { @@ -128,9 +125,8 @@ public class IndexModuleTests extends ESTestCase { threadPool = new TestThreadPool("test"); circuitBreakerService = new NoneCircuitBreakerService(); bigArrays = new BigArrays(settings, circuitBreakerService); - ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(emptyList()); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); - scriptService = new ScriptService(settings, scriptEngineRegistry, scriptContextRegistry); + scriptService = new ScriptService(settings, Collections.emptyMap(), scriptContextRegistry); clusterService = ClusterServiceUtils.createClusterService(threadPool); nodeEnvironment = new NodeEnvironment(settings, environment); mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 0f68dc3c50a..8e27ab5e9d3 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -47,6 +47,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; @@ -59,6 +61,7 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -225,7 +228,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase assert shardRoutings().stream() .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; - replica.updatePrimaryTerm(primary.getPrimaryTerm()); replicas.add(replica); updateAllocationIDsOnPrimary(); } @@ -254,17 +256,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase */ public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException { final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1; - IndexMetaData.Builder newMetaData = - IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm); + IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm); indexMetaData = newMetaData.build(); - for (IndexShard shard: replicas) { - shard.updatePrimaryTerm(newTerm); - } - boolean found = replicas.remove(replica); - assert found; + assertTrue(replicas.remove(replica)); closeShards(primary); primary = replica; - replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); + primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); + primary.updatePrimaryTerm(newTerm); updateAllocationIDsOnPrimary(); } @@ -476,15 +474,32 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final ReplicaRequest request, final long globalCheckpoint, final ActionListener listener) { - try { - IndexShard replica = replicationGroup.replicas.stream() + IndexShard replica = replicationGroup.replicas.stream() .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); - replica.updateGlobalCheckpointOnReplica(globalCheckpoint); - performOnReplica(request, replica); - listener.onResponse(new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint())); - } catch (Exception e) { - listener.onFailure(e); - } + replica.acquireReplicaOperationPermit( + request.primaryTerm(), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try { + replica.updateGlobalCheckpointOnReplica(globalCheckpoint); + performOnReplica(request, replica); + releasable.close(); + listener.onResponse( + new ReplicaResponse( + replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint())); + } catch (final Exception e) { + Releasables.closeWhileHandlingException(releasable); + listener.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }, + ThreadPool.Names.INDEX); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationsLockTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java similarity index 84% rename from core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationsLockTests.java rename to core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index d3df93513d0..18a250a4282 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationsLockTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -37,16 +37,15 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; -import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -public class IndexShardOperationsLockTests extends ESTestCase { +public class IndexShardOperationPermitsTests extends ESTestCase { private static ThreadPool threadPool; - private IndexShardOperationsLock block; + private IndexShardOperationPermits permits; @BeforeClass public static void setupThreadPool() { @@ -61,13 +60,13 @@ public class IndexShardOperationsLockTests extends ESTestCase { @Before public void createIndexShardOperationsLock() { - block = new IndexShardOperationsLock(new ShardId("blubb", "id", 0), logger, threadPool); + permits = new IndexShardOperationPermits(new ShardId("blubb", "id", 0), logger, threadPool); } @After public void checkNoInflightOperations() { - assertThat(block.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE)); - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); } public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException { @@ -87,7 +86,7 @@ public class IndexShardOperationsLockTests extends ESTestCase { Thread thread = new Thread() { public void run() { latch.countDown(); - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); } }; futures.add(future); @@ -123,29 +122,29 @@ public class IndexShardOperationsLockTests extends ESTestCase { public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); assertTrue(future.isDone()); future.get().close(); } public void testOperationsIfClosed() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); - block.close(); - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.close(); + permits.acquire(future, ThreadPool.Names.GENERIC, true); ExecutionException exception = expectThrows(ExecutionException.class, future::get); assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class)); } public void testBlockIfClosed() throws ExecutionException, InterruptedException { - block.close(); - expectThrows(IndexShardClosedException.class, () -> block.blockOperations(randomInt(10), TimeUnit.MINUTES, + permits.close(); + expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES, () -> { throw new IllegalArgumentException("fake error"); })); } public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); try (Releasable releasable = blockAndWait()) { - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); assertFalse(future.isDone()); } future.get(1, TimeUnit.HOURS).close(); @@ -192,8 +191,8 @@ public class IndexShardOperationsLockTests extends ESTestCase { context.putHeader("foo", "bar"); context.putTransient("bar", "baz"); // test both with and without a executor name - block.acquire(future, ThreadPool.Names.GENERIC, true); - block.acquire(future2, null, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future2, null, true); } assertFalse(future.isDone()); } @@ -209,7 +208,7 @@ public class IndexShardOperationsLockTests extends ESTestCase { IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0)); threadPool.generic().execute(() -> { try { - block.blockOperations(1, TimeUnit.MINUTES, () -> { + permits.blockOperations(1, TimeUnit.MINUTES, () -> { try { blockAcquired.countDown(); releaseBlock.await(); @@ -241,31 +240,31 @@ public class IndexShardOperationsLockTests extends ESTestCase { public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); - block.acquire(future1, ThreadPool.Names.GENERIC, true); + permits.acquire(future1, ThreadPool.Names.GENERIC, true); assertTrue(future1.isDone()); - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); PlainActionFuture future2 = new PlainActionFuture<>(); - block.acquire(future2, ThreadPool.Names.GENERIC, true); + permits.acquire(future2, ThreadPool.Names.GENERIC, true); assertTrue(future2.isDone()); - assertThat(block.getActiveOperationsCount(), equalTo(2)); + assertThat(permits.getActiveOperationsCount(), equalTo(2)); future1.get().close(); - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); future1.get().close(); // check idempotence - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); future2.get().close(); - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); try (Releasable releasable = blockAndWait()) { - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); } PlainActionFuture future3 = new PlainActionFuture<>(); - block.acquire(future3, ThreadPool.Names.GENERIC, true); + permits.acquire(future3, ThreadPool.Names.GENERIC, true); assertTrue(future3.isDone()); - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); future3.get().close(); - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 38bec989ab8..daaff4b1fc4 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -118,8 +119,12 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.LongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -130,11 +135,13 @@ import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; +import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; @@ -262,20 +269,20 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { - indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } try { - indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } } - public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -287,10 +294,10 @@ public class IndexShardTests extends IndexShardTestCase { // simulate promotion indexShard = newStartedShard(false); ShardRouting replicaRouting = indexShard.routingEntry(); - indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); } else { indexShard = newStartedShard(true); } @@ -298,15 +305,15 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, indexShard.getActiveOperationsCount()); if (indexShard.routingEntry().isRelocationTarget() == false) { try { - indexShard.acquireReplicaOperationLock(primaryTerm, null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX); fail("shard shouldn't accept operations as replica"); } catch (IllegalStateException ignored) { } } - Releasable operation1 = acquirePrimaryOperationLockBlockingly(indexShard); + Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard); assertEquals(1, indexShard.getActiveOperationsCount()); - Releasable operation2 = acquirePrimaryOperationLockBlockingly(indexShard); + Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard); assertEquals(2, indexShard.getActiveOperationsCount()); Releasables.close(operation1, operation2); @@ -315,20 +322,20 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } - private Releasable acquirePrimaryOperationLockBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { + private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); return fut.get(); } - private Releasable acquireReplicaOperationLockBlockingly(IndexShard indexShard, long opPrimaryTerm) + private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationLock(opPrimaryTerm, fut, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX); return fut.get(); } - public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException { + public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -367,33 +374,165 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, indexShard.getActiveOperationsCount()); if (shardRouting.primary() == false) { - try { - indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); - fail("shard shouldn't accept primary ops"); - } catch (IllegalStateException ignored) { - - } + final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX)); + assertThat(e, hasToString(containsString("shard is not a primary"))); } final long primaryTerm = indexShard.getPrimaryTerm(); - Releasable operation1 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); + final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); assertEquals(1, indexShard.getActiveOperationsCount()); - Releasable operation2 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); + final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); assertEquals(2, indexShard.getActiveOperationsCount()); - try { - indexShard.acquireReplicaOperationLock(primaryTerm - 1, null, ThreadPool.Names.INDEX); - fail("you can not increment the operation counter with an older primary term"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("operation term")); - assertThat(e.getMessage(), containsString("too old")); + { + final AtomicBoolean onResponse = new AtomicBoolean(); + final AtomicBoolean onFailure = new AtomicBoolean(); + final AtomicReference onFailureException = new AtomicReference<>(); + ActionListener onLockAcquired = new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + onResponse.set(true); + } + + @Override + public void onFailure(Exception e) { + onFailure.set(true); + onFailureException.set(e); + } + }; + + indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX); + + assertFalse(onResponse.get()); + assertTrue(onFailure.get()); + assertThat(onFailureException.get(), instanceOf(IllegalStateException.class)); + assertThat( + onFailureException.get(), hasToString(containsString("operation primary term [" + (primaryTerm - 1) + "] is too old"))); } - // but you can increment with a newer one.. - acquireReplicaOperationLockBlockingly(indexShard, primaryTerm + 1 + randomInt(20)).close(); - Releasables.close(operation1, operation2); - assertEquals(0, indexShard.getActiveOperationsCount()); + { + final AtomicBoolean onResponse = new AtomicBoolean(); + final AtomicBoolean onFailure = new AtomicBoolean(); + final CyclicBarrier barrier = new CyclicBarrier(2); + // but you can not increment with a new primary term until the operations on the older primary term complete + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquireReplicaOperationPermit( + primaryTerm + 1 + randomInt(20), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + onResponse.set(true); + releasable.close(); + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onFailure(Exception e) { + onFailure.set(true); + } + }, + ThreadPool.Names.SAME); + }); + thread.start(); + barrier.await(); + // our operation should be blocked until the previous operations complete + assertFalse(onResponse.get()); + assertFalse(onFailure.get()); + Releasables.close(operation1); + // our operation should still be blocked + assertFalse(onResponse.get()); + assertFalse(onFailure.get()); + Releasables.close(operation2); + barrier.await(); + // now lock acquisition should have succeeded + assertTrue(onResponse.get()); + assertFalse(onFailure.get()); + thread.join(); + assertEquals(0, indexShard.getActiveOperationsCount()); + } + + closeShards(indexShard); + } + + public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { + final IndexShard indexShard = newStartedShard(false); + + final CyclicBarrier barrier = new CyclicBarrier(3); + final CountDownLatch latch = new CountDownLatch(2); + + final long primaryTerm = indexShard.getPrimaryTerm(); + final AtomicLong counter = new AtomicLong(); + final AtomicReference onFailure = new AtomicReference<>(); + + final LongFunction function = increment -> () -> { + assert increment > 0; + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquireReplicaOperationPermit( + primaryTerm + increment, + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + counter.incrementAndGet(); + assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + increment)); + latch.countDown(); + releasable.close(); + } + + @Override + public void onFailure(Exception e) { + onFailure.set(e); + latch.countDown(); + } + }, + ThreadPool.Names.INDEX); + }; + + final long firstIncrement = 1 + (randomBoolean() ? 0 : 1); + final long secondIncrement = 1 + (randomBoolean() ? 0 : 1); + final Thread first = new Thread(function.apply(firstIncrement)); + final Thread second = new Thread(function.apply(secondIncrement)); + + first.start(); + second.start(); + + // the two threads synchronize attempting to acquire an operation permit + barrier.await(); + + // we wait for both operations to complete + latch.await(); + + first.join(); + second.join(); + + final Exception e; + if ((e = onFailure.get()) != null) { + /* + * If one thread tried to set the primary term to a higher value than the other thread and the thread with the higher term won + * the race, then the other thread lost the race and only one operation should have been executed. + */ + assertThat(e, instanceOf(IllegalStateException.class)); + assertThat(e, hasToString(matches("operation primary term \\[\\d+\\] is too old"))); + assertThat(counter.get(), equalTo(1L)); + } else { + assertThat(counter.get(), equalTo(2L)); + } + + assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + Math.max(firstIncrement, secondIncrement))); closeShards(indexShard); } @@ -701,7 +840,7 @@ public class IndexShardTests extends IndexShardTestCase { } }); - try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) { + try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { // start finalization of recovery recoveryThread.start(); latch.await(); @@ -711,7 +850,7 @@ public class IndexShardTests extends IndexShardTestCase { // recovery can be now finalized recoveryThread.join(); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); - try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) { + try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { // lock can again be acquired assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); } @@ -740,7 +879,7 @@ public class IndexShardTests extends IndexShardTestCase { super.onResponse(releasable); } }; - shard.acquirePrimaryOperationLock(onLockAcquired, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX); onLockAcquiredActions.add(onLockAcquired); } @@ -764,7 +903,7 @@ public class IndexShardTests extends IndexShardTestCase { indexThreads[i] = new Thread() { @Override public void run() { - try (Releasable operationLock = acquirePrimaryOperationLockBlockingly(shard)) { + try (Releasable operationLock = acquirePrimaryOperationPermitBlockingly(shard)) { allPrimaryOperationLocksAcquired.countDown(); barrier.await(); } catch (InterruptedException | BrokenBarrierException | ExecutionException e) { diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index b9e1b23bda1..0a106530f05 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -362,6 +362,11 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC this.shardRouting = shardRouting; } + @Override + public void updatePrimaryTerm(long primaryTerm) { + term = primaryTerm; + } + @Override public void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds) { this.activeAllocationIds = activeAllocationIds; diff --git a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index 59784456e98..20c9f3613e5 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -114,7 +114,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); PlainActionFuture fut = new PlainActionFuture<>(); - shard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); try (Releasable operationLock = fut.get()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); diff --git a/core/src/test/java/org/elasticsearch/script/ScriptContextTests.java b/core/src/test/java/org/elasticsearch/script/ScriptContextTests.java index 6fc77eeb9b7..9b1b8c74d72 100644 --- a/core/src/test/java/org/elasticsearch/script/ScriptContextTests.java +++ b/core/src/test/java/org/elasticsearch/script/ScriptContextTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; public class ScriptContextTests extends ESTestCase { @@ -42,13 +43,13 @@ public class ScriptContextTests extends ESTestCase { .build(); MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, Collections.singletonMap("1", script -> "1")); - ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(scriptEngine)); + Map engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine); List customContexts = Arrays.asList( new ScriptContext.Plugin(PLUGIN_NAME, "custom_op"), new ScriptContext.Plugin(PLUGIN_NAME, "custom_exp_disabled_op"), new ScriptContext.Plugin(PLUGIN_NAME, "custom_globally_disabled_op")); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customContexts); - ScriptService scriptService = new ScriptService(settings, scriptEngineRegistry, scriptContextRegistry); + ScriptService scriptService = new ScriptService(settings, engines, scriptContextRegistry); ClusterState empty = ClusterState.builder(new ClusterName("_name")).build(); ScriptMetaData smd = empty.metaData().custom(ScriptMetaData.TYPE); diff --git a/core/src/test/java/org/elasticsearch/script/ScriptServiceTests.java b/core/src/test/java/org/elasticsearch/script/ScriptServiceTests.java index 671fdaf5026..4d887574050 100644 --- a/core/src/test/java/org/elasticsearch/script/ScriptServiceTests.java +++ b/core/src/test/java/org/elasticsearch/script/ScriptServiceTests.java @@ -54,8 +54,7 @@ public class ScriptServiceTests extends ESTestCase { private ScriptEngine scriptEngine; private ScriptEngine dangerousScriptEngine; - private Map scriptEnginesByLangMap; - private ScriptEngineRegistry scriptEngineRegistry; + private Map engines; private ScriptContextRegistry scriptContextRegistry; private ScriptContext[] scriptContexts; private ScriptService scriptService; @@ -77,7 +76,6 @@ public class ScriptServiceTests extends ESTestCase { .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 10000) .build(); scriptEngine = new TestEngine(); - dangerousScriptEngine = new TestDangerousEngine(); TestEngine defaultScriptServiceEngine = new TestEngine(Script.DEFAULT_SCRIPT_LANG) {}; //randomly register custom script contexts int randomInt = randomIntBetween(0, 3); @@ -95,8 +93,9 @@ public class ScriptServiceTests extends ESTestCase { String context = plugin + "_" + operation; contexts.put(context, new ScriptContext.Plugin(plugin, operation)); } - scriptEngineRegistry = new ScriptEngineRegistry(Arrays.asList(scriptEngine, dangerousScriptEngine, - defaultScriptServiceEngine)); + engines = new HashMap<>(); + engines.put(scriptEngine.getType(), scriptEngine); + engines.put(defaultScriptServiceEngine.getType(), defaultScriptServiceEngine); scriptContextRegistry = new ScriptContextRegistry(contexts.values()); scriptContexts = scriptContextRegistry.scriptContexts().toArray(new ScriptContext[scriptContextRegistry.scriptContexts().size()]); logger.info("--> setup script service"); @@ -104,7 +103,7 @@ public class ScriptServiceTests extends ESTestCase { private void buildScriptService(Settings additionalSettings) throws IOException { Settings finalSettings = Settings.builder().put(baseSettings).put(additionalSettings).build(); - scriptService = new ScriptService(finalSettings, scriptEngineRegistry, scriptContextRegistry) { + scriptService = new ScriptService(finalSettings, engines, scriptContextRegistry) { @Override StoredScriptSource getScriptFromClusterState(String id, String lang) { //mock the script that gets retrieved from an index @@ -245,7 +244,9 @@ public class ScriptServiceTests extends ESTestCase { public void testSearchCountedInCompilationStats() throws IOException { buildScriptService(Settings.EMPTY); - scriptService.search(null, new Script(ScriptType.INLINE, "test", "1+1", Collections.emptyMap()), randomFrom(scriptContexts)); + Script script = new Script(ScriptType.INLINE, "test", "1+1", Collections.emptyMap()); + CompiledScript compile = scriptService.compile(script, randomFrom(scriptContexts)); + scriptService.search(null, compile, script.getParams()); assertEquals(1L, scriptService.stats().getCompilations()); } @@ -388,39 +389,5 @@ public class ScriptServiceTests extends ESTestCase { public void close() { } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } - } - - public static class TestDangerousEngine implements ScriptEngine { - - public static final String NAME = "dtest"; - - @Override - public String getType() { - return NAME; - } - - @Override - public Object compile(String scriptName, String scriptSource, Map params) { - return "compiled_" + scriptSource; - } - - @Override - public ExecutableScript executable(final CompiledScript compiledScript, @Nullable Map vars) { - return null; - } - - @Override - public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map vars) { - return null; - } - - @Override - public void close() { - } } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgIT.java index 9534bafa862..bef3df6fa91 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/AvgIT.java @@ -588,10 +588,5 @@ public class AvgIT extends AbstractNumericTestCase { } }; } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/SumIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/SumIT.java index 95540e5bd17..f96b0d03496 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/SumIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/SumIT.java @@ -491,11 +491,6 @@ public class SumIT extends AbstractNumericTestCase { } }; } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } /** @@ -597,10 +592,5 @@ public class SumIT extends AbstractNumericTestCase { } }; } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/ValueCountIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/ValueCountIT.java index 362e77ebc86..a795c9093a9 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/ValueCountIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/ValueCountIT.java @@ -344,10 +344,5 @@ public class ValueCountIT extends ESIntegTestCase { } }; } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetricTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetricTests.java index 093dab2f4f6..dcd471355e7 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetricTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetricTests.java @@ -23,11 +23,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContextRegistry; -import org.elasticsearch.script.ScriptEngineRegistry; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.ParsedAggregation; @@ -119,10 +117,9 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase { return ((List) script.get("_aggs")).size(); })); - ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(scriptEngine)); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); try { - return new ScriptService(Settings.EMPTY, scriptEngineRegistry, scriptContextRegistry); + return new ScriptService(Settings.EMPTY, Collections.singletonMap(scriptEngine.getType(), scriptEngine), scriptContextRegistry); } catch (IOException e) { throw new ElasticsearchException(e); } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java index 441be5b0e8c..e5b62e76763 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java @@ -27,7 +27,6 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryShardContext; @@ -36,7 +35,7 @@ import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.ScoreAccessor; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContextRegistry; -import org.elasticsearch.script.ScriptEngineRegistry; +import org.elasticsearch.script.ScriptEngine; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.AggregatorTestCase; @@ -198,11 +197,11 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase { protected QueryShardContext queryShardContextMock(MapperService mapperService, final MappedFieldType[] fieldTypes, CircuitBreakerService circuitBreakerService) { MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, SCRIPTS); - ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(scriptEngine)); + Map engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); ScriptService scriptService; try { - scriptService = new ScriptService(Settings.EMPTY, scriptEngineRegistry, scriptContextRegistry); + scriptService = new ScriptService(Settings.EMPTY, engines, scriptContextRegistry); } catch (IOException e) { throw new ElasticsearchException(e); } diff --git a/core/src/test/java/org/elasticsearch/search/sort/AbstractSortTestCase.java b/core/src/test/java/org/elasticsearch/search/sort/AbstractSortTestCase.java index 83a3cf85149..a3d7d3aad7f 100644 --- a/core/src/test/java/org/elasticsearch/search/sort/AbstractSortTestCase.java +++ b/core/src/test/java/org/elasticsearch/search/sort/AbstractSortTestCase.java @@ -54,7 +54,7 @@ import org.elasticsearch.script.CompiledScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptContextRegistry; -import org.elasticsearch.script.ScriptEngineRegistry; +import org.elasticsearch.script.ScriptEngine; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptServiceTests.TestEngine; import org.elasticsearch.script.ScriptType; @@ -62,7 +62,6 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.watcher.ResourceWatcherService; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -89,8 +88,8 @@ public abstract class AbstractSortTestCase> extends EST .put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder) .build(); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); - ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new TestEngine())); - scriptService = new ScriptService(baseSettings, scriptEngineRegistry, scriptContextRegistry) { + ScriptEngine engine = new TestEngine(); + scriptService = new ScriptService(baseSettings, Collections.singletonMap(engine.getType(), engine), scriptContextRegistry) { @Override public CompiledScript compile(Script script, ScriptContext scriptContext) { return new CompiledScript(ScriptType.INLINE, "mockName", "test", script); diff --git a/core/src/test/java/org/elasticsearch/search/suggest/SuggestSearchIT.java b/core/src/test/java/org/elasticsearch/search/suggest/SuggestSearchIT.java index 9bc3c9e2b70..b08eaee3daf 100644 --- a/core/src/test/java/org/elasticsearch/search/suggest/SuggestSearchIT.java +++ b/core/src/test/java/org/elasticsearch/search/suggest/SuggestSearchIT.java @@ -1059,11 +1059,6 @@ public class SuggestSearchIT extends ESIntegTestCase { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map vars) { throw new UnsupportedOperationException("search script not supported"); } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } public void testPhraseSuggesterCollate() throws InterruptedException, ExecutionException, IOException { diff --git a/core/src/test/java/org/elasticsearch/update/UpdateIT.java b/core/src/test/java/org/elasticsearch/update/UpdateIT.java index defbc411c9e..f6eeadfc649 100644 --- a/core/src/test/java/org/elasticsearch/update/UpdateIT.java +++ b/core/src/test/java/org/elasticsearch/update/UpdateIT.java @@ -132,11 +132,6 @@ public class UpdateIT extends ESIntegTestCase { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map vars) { throw new UnsupportedOperationException(); } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } public static class FieldIncrementScriptPlugin extends Plugin implements ScriptPlugin { @@ -193,11 +188,6 @@ public class UpdateIT extends ESIntegTestCase { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map vars) { throw new UnsupportedOperationException(); } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } public static class ScriptedUpsertScriptPlugin extends Plugin implements ScriptPlugin { @@ -254,12 +244,6 @@ public class UpdateIT extends ESIntegTestCase { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map vars) { throw new UnsupportedOperationException(); } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } - } public static class ExtractContextInSourceScriptPlugin extends Plugin implements ScriptPlugin { @@ -317,11 +301,6 @@ public class UpdateIT extends ESIntegTestCase { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map vars) { throw new UnsupportedOperationException(); } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } @Override diff --git a/docs/reference/docs/delete.asciidoc b/docs/reference/docs/delete.asciidoc index 6fbe5e33ce2..f26a7fc64d0 100644 --- a/docs/reference/docs/delete.asciidoc +++ b/docs/reference/docs/delete.asciidoc @@ -105,7 +105,8 @@ thrown instead. [[delete-index-creation]] === Automatic index creation -The delete operation automatically creates an index if it has not been +If an <> is used, +the delete operation automatically creates an index if it has not been created before (check out the <> for manually creating an index), and also automatically creates a dynamic type mapping for the specific type if it has not been created diff --git a/docs/reference/migration/migrate_6_0/indices.asciidoc b/docs/reference/migration/migrate_6_0/indices.asciidoc index 0a05fd55139..95d0253fe63 100644 --- a/docs/reference/migration/migrate_6_0/indices.asciidoc +++ b/docs/reference/migration/migrate_6_0/indices.asciidoc @@ -44,3 +44,9 @@ The default value of the `allow_no_indices` option for the Open/Close index API has been changed from `false` to `true` so it is aligned with the behaviour of the Delete index API. As a result, Open/Close index API don't return an error by default when a provided wildcard expression doesn't match any closed/open index. + +==== Delete a document + +Delete a document from non-existing index has been modified to not create the index. +However if an external versioning is used the index will be created and the document +will be marked for deletion. diff --git a/docs/reference/migration/migrate_6_0/mappings.asciidoc b/docs/reference/migration/migrate_6_0/mappings.asciidoc index e85b31d97ff..369ba3da162 100644 --- a/docs/reference/migration/migrate_6_0/mappings.asciidoc +++ b/docs/reference/migration/migrate_6_0/mappings.asciidoc @@ -26,6 +26,6 @@ now disallowed for these indices' mappings. ==== Unrecognized `match_mapping_type` options not silently ignored -Previously Elastiscearch would silently ignore any dynamic templates that +Previously Elasticsearch would silently ignore any dynamic templates that included a `match_mapping_type` type that was unrecognized. An exception is now thrown on an unrecognized type. diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index a9a5d2993c0..ee34707bf9b 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -94,6 +94,8 @@ thread_pool: [float] ==== `fixed_auto_queue_size` +experimental[] + The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle the requests with a bounded queue for pending requests that have no threads to service them. It's similar to the `fixed` threadpool, however, the `queue_size` diff --git a/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java b/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java index c8557a3c620..11a10247e03 100644 --- a/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java +++ b/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java @@ -255,9 +255,4 @@ public class ExpressionScriptEngine extends AbstractComponent implements ScriptE @Override public void close() {} - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MustacheScriptEngine.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MustacheScriptEngine.java index 8aa2c8555f5..d5e35b6383f 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MustacheScriptEngine.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MustacheScriptEngine.java @@ -142,9 +142,4 @@ public final class MustacheScriptEngine implements ScriptEngine { return writer.toString(); } } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScriptEngine.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScriptEngine.java index 57ecb52cc27..aa96b7c1482 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScriptEngine.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScriptEngine.java @@ -264,9 +264,4 @@ public final class PainlessScriptEngine extends AbstractComponent implements Scr private int getNextStatement(String scriptSource, int offset) { return Math.min(scriptSource.length(), offset + 25); } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } } diff --git a/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java b/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java index 1865f68158e..186e53591ae 100644 --- a/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java +++ b/modules/percolator/src/main/java/org/elasticsearch/percolator/PercolatorFieldMapper.java @@ -282,7 +282,7 @@ public class PercolatorFieldMapper extends FieldMapper { ); verifyQuery(queryBuilder); // Fetching of terms, shapes and indexed scripts happen during this rewrite: - queryBuilder = queryBuilder.rewrite(queryShardContext); + queryBuilder = QueryBuilder.rewriteQuery(queryBuilder, queryShardContext); try (XContentBuilder builder = XContentFactory.contentBuilder(QUERY_BUILDER_CONTENT_TYPE)) { queryBuilder.toXContent(builder, new MapParams(Collections.emptyMap())); diff --git a/plugins/analysis-icu/build.gradle b/plugins/analysis-icu/build.gradle index 53f2747c0a2..a25c2c771ff 100644 --- a/plugins/analysis-icu/build.gradle +++ b/plugins/analysis-icu/build.gradle @@ -24,7 +24,7 @@ esplugin { dependencies { compile "org.apache.lucene:lucene-analyzers-icu:${versions.lucene}" - compile 'com.ibm.icu:icu4j:54.1' + compile 'com.ibm.icu:icu4j:56.1' } dependencyLicenses { diff --git a/plugins/analysis-icu/licenses/icu4j-54.1.jar.sha1 b/plugins/analysis-icu/licenses/icu4j-54.1.jar.sha1 deleted file mode 100644 index 25d6eb37acc..00000000000 --- a/plugins/analysis-icu/licenses/icu4j-54.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -3f66ecd5871467598bc81662817b80612a0a907f diff --git a/plugins/analysis-icu/licenses/icu4j-56.1.jar.sha1 b/plugins/analysis-icu/licenses/icu4j-56.1.jar.sha1 new file mode 100644 index 00000000000..51dc722bf92 --- /dev/null +++ b/plugins/analysis-icu/licenses/icu4j-56.1.jar.sha1 @@ -0,0 +1 @@ +8dd6671f52165a0419e6de5e1016400875a90fa9 \ No newline at end of file diff --git a/plugins/examples/script-expert-scoring/src/main/java/org/elasticsearch/example/expertscript/ExpertScriptPlugin.java b/plugins/examples/script-expert-scoring/src/main/java/org/elasticsearch/example/expertscript/ExpertScriptPlugin.java index d56e7932499..b06743b95e7 100644 --- a/plugins/examples/script-expert-scoring/src/main/java/org/elasticsearch/example/expertscript/ExpertScriptPlugin.java +++ b/plugins/examples/script-expert-scoring/src/main/java/org/elasticsearch/example/expertscript/ExpertScriptPlugin.java @@ -133,11 +133,6 @@ public class ExpertScriptPlugin extends Plugin implements ScriptPlugin { throw new UnsupportedOperationException(); } - @Override - public boolean isInlineScriptEnabled() { - return true; - } - @Override public void close() {} } diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/AbstractScriptTestCase.java b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/AbstractScriptTestCase.java index 8ca24d8b449..3fa9bbd17b3 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/AbstractScriptTestCase.java +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/AbstractScriptTestCase.java @@ -21,13 +21,14 @@ package org.elasticsearch.ingest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptContextRegistry; -import org.elasticsearch.script.ScriptEngineRegistry; +import org.elasticsearch.script.ScriptEngine; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.mustache.MustacheScriptEngine; import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.util.Collections; +import java.util.Map; public abstract class AbstractScriptTestCase extends ESTestCase { @@ -35,9 +36,10 @@ public abstract class AbstractScriptTestCase extends ESTestCase { @Before public void init() throws Exception { - ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new MustacheScriptEngine())); + MustacheScriptEngine engine = new MustacheScriptEngine(); + Map engines = Collections.singletonMap(engine.getType(), engine); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); - ScriptService scriptService = new ScriptService(Settings.EMPTY, scriptEngineRegistry, scriptContextRegistry); + ScriptService scriptService = new ScriptService(Settings.EMPTY, engines, scriptContextRegistry); templateService = new InternalTemplateService(scriptService); } diff --git a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java index 9231e6cbf46..d5dfe15111d 100644 --- a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java @@ -94,12 +94,6 @@ public class MockScriptEngine implements ScriptEngine { public void close() throws IOException { } - @Override - public boolean isInlineScriptEnabled() { - return true; - } - - public class MockCompiledScript { private final String name;