Merge branch 'master' into feature/client_aggs_parsing

This commit is contained in:
javanna 2017-05-22 12:25:14 +02:00 committed by Luca Cavanna
commit 7a3e38eb8e
51 changed files with 586 additions and 400 deletions

View File

@ -657,11 +657,11 @@ class ClusterFormationTasks {
standardOutput = new ByteArrayOutputStream() standardOutput = new ByteArrayOutputStream()
doLast { doLast {
String out = standardOutput.toString() String out = standardOutput.toString()
if (out.contains("${pid} org.elasticsearch.bootstrap.Elasticsearch") == false) { if (out.contains("${ext.pid} org.elasticsearch.bootstrap.Elasticsearch") == false) {
logger.error('jps -l') logger.error('jps -l')
logger.error(out) logger.error(out)
logger.error("pid file: ${pidFile}") logger.error("pid file: ${node.pidFile}")
logger.error("pid: ${pid}") logger.error("pid: ${ext.pid}")
throw new GradleException("jps -l did not report any process with org.elasticsearch.bootstrap.Elasticsearch\n" + throw new GradleException("jps -l did not report any process with org.elasticsearch.bootstrap.Elasticsearch\n" +
"Did you run gradle clean? Maybe an old pid file is still lying around.") "Did you run gradle clean? Maybe an old pid file is still lying around.")
} else { } else {

View File

@ -55,7 +55,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
@ -63,16 +62,6 @@ import static java.util.Collections.singletonMap;
public class CrudIT extends ESRestHighLevelClientTestCase { public class CrudIT extends ESRestHighLevelClientTestCase {
public void testDelete() throws IOException { public void testDelete() throws IOException {
{
// Testing non existing document
String docId = "does_not_exist";
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId);
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
assertEquals("index", deleteResponse.getIndex());
assertEquals("type", deleteResponse.getType());
assertEquals(docId, deleteResponse.getId());
assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult());
}
{ {
// Testing deletion // Testing deletion
String docId = "id"; String docId = "id";
@ -87,6 +76,16 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
assertEquals(docId, deleteResponse.getId()); assertEquals(docId, deleteResponse.getId());
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
} }
{
// Testing non existing document
String docId = "does_not_exist";
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId);
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
assertEquals("index", deleteResponse.getIndex());
assertEquals("type", deleteResponse.getType());
assertEquals(docId, deleteResponse.getId());
assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult());
}
{ {
// Testing version conflict // Testing version conflict
String docId = "version_conflict"; String docId = "version_conflict";

View File

@ -72,6 +72,8 @@ public class Version implements Comparable<Version> {
public static final Version V_5_3_1 = new Version(V_5_3_1_ID, org.apache.lucene.util.Version.LUCENE_6_4_2); public static final Version V_5_3_1 = new Version(V_5_3_1_ID, org.apache.lucene.util.Version.LUCENE_6_4_2);
public static final int V_5_3_2_ID = 5030299; public static final int V_5_3_2_ID = 5030299;
public static final Version V_5_3_2 = new Version(V_5_3_2_ID, org.apache.lucene.util.Version.LUCENE_6_4_2); public static final Version V_5_3_2 = new Version(V_5_3_2_ID, org.apache.lucene.util.Version.LUCENE_6_4_2);
public static final int V_5_3_3_ID_UNRELEASED = 5030399;
public static final Version V_5_3_3_UNRELEASED = new Version(V_5_3_3_ID_UNRELEASED, org.apache.lucene.util.Version.LUCENE_6_4_2);
public static final int V_5_4_0_ID = 5040099; public static final int V_5_4_0_ID = 5040099;
public static final Version V_5_4_0 = new Version(V_5_4_0_ID, org.apache.lucene.util.Version.LUCENE_6_5_0); public static final Version V_5_4_0 = new Version(V_5_4_0_ID, org.apache.lucene.util.Version.LUCENE_6_5_0);
public static final int V_5_5_0_ID_UNRELEASED = 5050099; public static final int V_5_5_0_ID_UNRELEASED = 5050099;
@ -105,6 +107,8 @@ public class Version implements Comparable<Version> {
return V_5_5_0_UNRELEASED; return V_5_5_0_UNRELEASED;
case V_5_4_0_ID: case V_5_4_0_ID:
return V_5_4_0; return V_5_4_0;
case V_5_3_3_ID_UNRELEASED:
return V_5_3_3_UNRELEASED;
case V_5_3_2_ID: case V_5_3_2_ID:
return V_5_3_2; return V_5_3_2;
case V_5_3_1_ID: case V_5_3_1_ID:

View File

@ -54,6 +54,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestService;
@ -144,6 +145,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
// Attempt to create all the indices that we're going to need during the bulk before we start. // Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request // Step 1: collect all the indices in the request
final Set<String> indices = bulkRequest.requests.stream() final Set<String> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index) .map(DocWriteRequest::index)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
/* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create

View File

@ -38,7 +38,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -52,7 +51,6 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -179,8 +177,8 @@ public abstract class TransportReplicationAction<
Request shardRequest, IndexShard primary) throws Exception; Request shardRequest, IndexShard primary) throws Exception;
/** /**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form * Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)} * {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}.
* *
* @param shardRequest the request to the replica shard * @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on * @param replica the replica shard to perform the operation on
@ -584,7 +582,7 @@ public abstract class TransportReplicationAction<
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId); actualAllocationId);
} }
replica.acquireReplicaOperationLock(request.primaryTerm, this, executor); replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
} }
/** /**
@ -921,7 +919,7 @@ public abstract class TransportReplicationAction<
} }
}; };
indexShard.acquirePrimaryOperationLock(onAcquired, executor); indexShard.acquirePrimaryOperationPermit(onAcquired, executor);
} }
class ShardReference implements Releasable { class ShardReference implements Releasable {

View File

@ -612,11 +612,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
rescheduleFsyncTask(durability); rescheduleFsyncTask(durability);
} }
} }
// update primary terms
for (final IndexShard shard : this.shards.values()) {
shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id()));
}
} }
private void rescheduleFsyncTask(Translog.Durability durability) { private void rescheduleFsyncTask(Translog.Durability durability) {

View File

@ -333,7 +333,8 @@ public class QueryShardContext extends QueryRewriteContext {
*/ */
public final SearchScript getSearchScript(Script script, ScriptContext context) { public final SearchScript getSearchScript(Script script, ScriptContext context) {
failIfFrozen(); failIfFrozen();
return scriptService.search(lookup(), script, context); CompiledScript compile = scriptService.compile(script, context);
return scriptService.search(lookup(), compile, script.getParams());
} }
/** /**
* Returns a lazily created {@link SearchScript} that is compiled immediately but can be pulled later once all * Returns a lazily created {@link SearchScript} that is compiled immediately but can be pulled later once all

View File

@ -50,7 +50,6 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -129,6 +128,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -195,7 +195,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final ShardPath path; private final ShardPath path;
private final IndexShardOperationsLock indexShardOperationsLock; private final IndexShardOperationPermits indexShardOperationPermits;
private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
// for primaries, we only allow to write when actually started (so the cluster has decided we started) // for primaries, we only allow to write when actually started (so the cluster has decided we started)
@ -272,7 +272,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
this.cachingPolicy = cachingPolicy; this.cachingPolicy = cachingPolicy;
} }
indexShardOperationsLock = new IndexShardOperationsLock(shardId, logger, threadPool); indexShardOperationPermits = new IndexShardOperationPermits(shardId, logger, threadPool);
searcherWrapper = indexSearcherWrapper; searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners(); refreshListeners = buildRefreshListeners();
@ -328,7 +328,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return this.shardFieldData; return this.shardFieldData;
} }
/** /**
* Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
*/ */
@ -340,6 +339,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* notifies the shard of an increase in the primary term * notifies the shard of an increase in the primary term
*/ */
public void updatePrimaryTerm(final long newTerm) { public void updatePrimaryTerm(final long newTerm) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) { synchronized (mutex) {
if (newTerm != primaryTerm) { if (newTerm != primaryTerm) {
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
@ -354,10 +354,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// //
// We could fail the shard in that case, but this will cause it to be removed from the insync allocations list // We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
// potentially preventing re-allocation. // potentially preventing re-allocation.
assert shardRouting.primary() == false || shardRouting.initializing() == false : assert shardRouting.initializing() == false :
"a started primary shard should never update it's term. shard: " + shardRouting "a started primary shard should never update its term; "
+ " current term [" + primaryTerm + "] new term [" + newTerm + "]"; + "shard " + shardRouting + ", "
assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]"; + "current term [" + primaryTerm + "], "
+ "new term [" + newTerm + "]";
assert newTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]";
primaryTerm = newTerm; primaryTerm = newTerm;
} }
} }
@ -457,9 +460,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try { try {
indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation locks are being held here, move state from started to relocated // no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationsLock.getActiveOperationsCount() == 0 : assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated"; "in-flight operations in progress while moving shard state to relocated";
synchronized (mutex) { synchronized (mutex) {
if (state != IndexShardState.STARTED) { if (state != IndexShardState.STARTED) {
@ -974,7 +977,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// playing safe here and close the engine even if the above succeeds - close can be called multiple times // playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners // Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, refreshListeners); IOUtils.close(engine, refreshListeners);
indexShardOperationsLock.close(); indexShardOperationPermits.close();
} }
} }
} }
@ -1841,35 +1844,81 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
/** /**
* Acquire a primary operation lock whenever the shard is ready for indexing. If the lock is directly available, the provided * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, lock acquisition can be delayed. The provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor. * ActionListener will then be called using the provided executor.
*/ */
public void acquirePrimaryOperationLock(ActionListener<Releasable> onLockAcquired, String executorOnDelay) { public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
verifyNotClosed(); verifyNotClosed();
verifyPrimary(); verifyPrimary();
indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, false); indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false);
} }
private final Object primaryTermMutex = new Object();
/** /**
* Acquire a replica operation lock whenever the shard is ready for indexing (see acquirePrimaryOperationLock). If the given primary * Acquire a replica operation permit whenever the shard is ready for indexing (see
* term is lower then the one in {@link #shardRouting} an {@link IllegalArgumentException} is thrown. * {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
*
* @param operationPrimaryTerm the operation primary term
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
*/ */
public void acquireReplicaOperationLock(long opPrimaryTerm, ActionListener<Releasable> onLockAcquired, String executorOnDelay) { public void acquireReplicaOperationPermit(
final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
verifyNotClosed(); verifyNotClosed();
verifyReplicationTarget(); verifyReplicationTarget();
if (primaryTerm > opPrimaryTerm) { if (operationPrimaryTerm > primaryTerm) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException synchronized (primaryTermMutex) {
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", if (operationPrimaryTerm > primaryTerm) {
shardId, opPrimaryTerm, primaryTerm)); try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm;
primaryTerm = operationPrimaryTerm;
});
} catch (final InterruptedException | TimeoutException e) {
onPermitAcquired.onFailure(e);
return;
}
}
}
} }
indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, true); assert operationPrimaryTerm <= primaryTerm
: "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]";
indexShardOperationPermits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (operationPrimaryTerm < primaryTerm) {
releasable.close();
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
operationPrimaryTerm,
primaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
onPermitAcquired.onResponse(releasable);
}
}
@Override
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
},
executorOnDelay,
true);
} }
public int getActiveOperationsCount() { public int getActiveOperationsCount() {
return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
} }
private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) { private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) {

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard; package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -36,7 +37,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier; import java.util.function.Supplier;
public class IndexShardOperationsLock implements Closeable { final class IndexShardOperationPermits implements Closeable {
private final ShardId shardId; private final ShardId shardId;
private final Logger logger; private final Logger logger;
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -44,10 +45,10 @@ public class IndexShardOperationsLock implements Closeable {
private static final int TOTAL_PERMITS = Integer.MAX_VALUE; private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
// fair semaphore to ensure that blockOperations() does not starve under thread contention // fair semaphore to ensure that blockOperations() does not starve under thread contention
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed due to relocation hand-off @Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed
private volatile boolean closed; private volatile boolean closed;
public IndexShardOperationsLock(ShardId shardId, Logger logger, ThreadPool threadPool) { IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) {
this.shardId = shardId; this.shardId = shardId;
this.logger = logger; this.logger = logger;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -67,7 +68,7 @@ public class IndexShardOperationsLock implements Closeable {
* @param onBlocked the action to run once the block has been acquired * @param onBlocked the action to run once the block has been acquired
* @throws InterruptedException if calling thread is interrupted * @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish * @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation lock has been closed * @throws IndexShardClosedException if operation permit has been closed
*/ */
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException { public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
if (closed) { if (closed) {
@ -75,6 +76,7 @@ public class IndexShardOperationsLock implements Closeable {
} }
try { try {
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try { try {
onBlocked.run(); onBlocked.run();
} finally { } finally {
@ -91,7 +93,7 @@ public class IndexShardOperationsLock implements Closeable {
} }
if (queuedActions != null) { if (queuedActions != null) {
// Try acquiring permits on fresh thread (for two reasons): // Try acquiring permits on fresh thread (for two reasons):
// - blockOperations is called on recovery thread which can be expected to be interrupted when recovery is cancelled. // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
// Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
// ThreadedActionListener if the queue of the thread pool on which it submits is full. // ThreadedActionListener if the queue of the thread pool on which it submits is full.
// - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
@ -106,12 +108,12 @@ public class IndexShardOperationsLock implements Closeable {
} }
/** /**
* Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock * {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)},
* acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations * permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no
* terminates. * longer blocked.
* *
* @param onAcquired ActionListener that is invoked once acquisition is successful or failed * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call * @param executorOnDelay executor to use for delayed call
* @param forceExecution whether the runnable should force its execution in case it gets rejected * @param forceExecution whether the runnable should force its execution in case it gets rejected
*/ */

View File

@ -561,6 +561,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds = final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
shard.updateAllocationIdsFromMaster(activeIds, initializingIds); shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
} }
} catch (Exception e) { } catch (Exception e) {
@ -737,6 +738,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
*/ */
void updateRoutingEntry(ShardRouting shardRouting) throws IOException; void updateRoutingEntry(ShardRouting shardRouting) throws IOException;
/**
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
*/
void updatePrimaryTerm(long primaryTerm);
/** /**
* Notifies the service of the current allocation ids in the cluster state. * Notifies the service of the current allocation ids in the cluster state.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.

View File

@ -48,11 +48,4 @@ public interface ScriptEngine extends Closeable {
ExecutableScript executable(CompiledScript compiledScript, @Nullable Map<String, Object> vars); ExecutableScript executable(CompiledScript compiledScript, @Nullable Map<String, Object> vars);
SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map<String, Object> vars); SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map<String, Object> vars);
/**
* Returns <code>true</code> if this scripting engine can safely accept inline scripts by default. The default is <code>false</code>
*/
default boolean isInlineScriptEnabled() {
return false;
}
} }

View File

@ -1,77 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class ScriptEngineRegistry {
private final Map<Class<? extends ScriptEngine>, String> registeredScriptEngineServices;
private final Map<String, ScriptEngine> registeredLanguages;
private final Map<String, Boolean> defaultInlineScriptEnableds;
public ScriptEngineRegistry(Iterable<ScriptEngine> registrations) {
Objects.requireNonNull(registrations);
Map<Class<? extends ScriptEngine>, String> registeredScriptEngineServices = new HashMap<>();
Map<String, ScriptEngine> registeredLanguages = new HashMap<>();
Map<String, Boolean> inlineScriptEnableds = new HashMap<>();
for (ScriptEngine service : registrations) {
String oldLanguage = registeredScriptEngineServices.putIfAbsent(service.getClass(),
service.getType());
if (oldLanguage != null) {
throw new IllegalArgumentException("script engine service [" + service.getClass() +
"] already registered for language [" + oldLanguage + "]");
}
String language = service.getType();
ScriptEngine scriptEngine =
registeredLanguages.putIfAbsent(language, service);
if (scriptEngine != null) {
throw new IllegalArgumentException("scripting language [" + language + "] already registered for script engine service [" +
scriptEngine.getClass().getCanonicalName() + "]");
}
inlineScriptEnableds.put(language, service.isInlineScriptEnabled());
}
this.registeredScriptEngineServices = Collections.unmodifiableMap(registeredScriptEngineServices);
this.registeredLanguages = Collections.unmodifiableMap(registeredLanguages);
this.defaultInlineScriptEnableds = Collections.unmodifiableMap(inlineScriptEnableds);
}
Iterable<Class<? extends ScriptEngine>> getRegisteredScriptEngineServices() {
return registeredScriptEngineServices.keySet();
}
String getLanguage(Class<? extends ScriptEngine> scriptEngineService) {
Objects.requireNonNull(scriptEngineService);
return registeredScriptEngineServices.get(scriptEngineService);
}
public Map<String, ScriptEngine> getRegisteredLanguages() {
return registeredLanguages;
}
public Map<String, Boolean> getDefaultInlineScriptEnableds() {
return this.defaultInlineScriptEnableds;
}
}

View File

@ -24,7 +24,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.plugins.ScriptPlugin;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -52,14 +55,26 @@ public class ScriptModule {
public ScriptModule(Settings settings, List<ScriptEngine> scriptEngines, public ScriptModule(Settings settings, List<ScriptEngine> scriptEngines,
List<ScriptContext.Plugin> customScriptContexts) { List<ScriptContext.Plugin> customScriptContexts) {
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customScriptContexts); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customScriptContexts);
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(scriptEngines); Map<String, ScriptEngine> enginesByName = getEnginesByName(scriptEngines);
try { try {
scriptService = new ScriptService(settings, scriptEngineRegistry, scriptContextRegistry); scriptService = new ScriptService(settings, enginesByName, scriptContextRegistry);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Couldn't setup ScriptService", e); throw new RuntimeException("Couldn't setup ScriptService", e);
} }
} }
private Map<String, ScriptEngine> getEnginesByName(List<ScriptEngine> engines) {
Map<String, ScriptEngine> enginesByName = new HashMap<>();
for (ScriptEngine engine : engines) {
ScriptEngine existing = enginesByName.put(engine.getType(), engine);
if (existing != null) {
throw new IllegalArgumentException("scripting language [" + engine.getType() + "] defined for engine [" +
existing.getClass().getName() + "] and [" + engine.getClass().getName());
}
}
return Collections.unmodifiableMap(enginesByName);
}
/** /**
* Service responsible for managing scripts. * Service responsible for managing scripts.
*/ */

View File

@ -96,11 +96,11 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust
private double scriptsPerMinCounter; private double scriptsPerMinCounter;
private double compilesAllowedPerNano; private double compilesAllowedPerNano;
public ScriptService(Settings settings, ScriptEngineRegistry scriptEngineRegistry, ScriptContextRegistry scriptContextRegistry) throws IOException { public ScriptService(Settings settings, Map<String, ScriptEngine> engines, ScriptContextRegistry scriptContextRegistry) throws IOException {
super(settings); super(settings);
Objects.requireNonNull(settings); Objects.requireNonNull(settings);
Objects.requireNonNull(scriptEngineRegistry); this.engines = Objects.requireNonNull(engines);
Objects.requireNonNull(scriptContextRegistry); Objects.requireNonNull(scriptContextRegistry);
if (Strings.hasLength(settings.get(DISABLE_DYNAMIC_SCRIPTING_SETTING))) { if (Strings.hasLength(settings.get(DISABLE_DYNAMIC_SCRIPTING_SETTING))) {
@ -192,8 +192,6 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust
logger.debug("using script cache with max_size [{}], expire [{}]", cacheMaxSize, cacheExpire); logger.debug("using script cache with max_size [{}], expire [{}]", cacheMaxSize, cacheExpire);
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build(); this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
this.engines = scriptEngineRegistry.getRegisteredLanguages();
this.lastInlineCompileTime = System.nanoTime(); this.lastInlineCompileTime = System.nanoTime();
this.setMaxCompilationsPerMinute(SCRIPT_MAX_COMPILATIONS_PER_MINUTE.get(settings)); this.setMaxCompilationsPerMinute(SCRIPT_MAX_COMPILATIONS_PER_MINUTE.get(settings));
} }
@ -510,14 +508,6 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust
return getEngine(compiledScript.lang()).executable(compiledScript, params); return getEngine(compiledScript.lang()).executable(compiledScript, params);
} }
/**
* Compiles (or retrieves from cache) and executes the provided search script
*/
public SearchScript search(SearchLookup lookup, Script script, ScriptContext scriptContext) {
CompiledScript compiledScript = compile(script, scriptContext);
return search(lookup, compiledScript, script.getParams());
}
/** /**
* Binds provided parameters to a compiled script returning a * Binds provided parameters to a compiled script returning a
* {@link SearchScript} ready for execution * {@link SearchScript} ready for execution

View File

@ -48,6 +48,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.SearchScript; import org.elasticsearch.script.SearchScript;
@ -685,7 +686,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
} }
if (source.scriptFields() != null) { if (source.scriptFields() != null) {
for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) { for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
SearchScript searchScript = scriptService.search(context.lookup(), field.script(), ScriptContext.Standard.SEARCH); CompiledScript compile = scriptService.compile(field.script(), ScriptContext.Standard.SEARCH);
SearchScript searchScript = scriptService.search(context.lookup(), compile, field.script().getParams());
context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure())); context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure()));
} }
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -66,7 +67,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
BulkRequest bulkRequest = new BulkRequest(); BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("no")); bulkRequest.add(new IndexRequest("no"));
bulkRequest.add(new IndexRequest("can't")); bulkRequest.add(new IndexRequest("can't"));
bulkRequest.add(new DeleteRequest("do")); bulkRequest.add(new DeleteRequest("do").version(0).versionType(VersionType.EXTERNAL));
bulkRequest.add(new UpdateRequest("nothin", randomAlphaOfLength(5), randomAlphaOfLength(5))); bulkRequest.add(new UpdateRequest("nothin", randomAlphaOfLength(5), randomAlphaOfLength(5)));
indicesThatCannotBeCreatedTestCase(new HashSet<>(Arrays.asList("no", "can't", "do", "nothin")), bulkRequest, index -> { indicesThatCannotBeCreatedTestCase(new HashSet<>(Arrays.asList("no", "can't", "do", "nothin")), bulkRequest, index -> {
throw new IndexNotFoundException("Can't make it because I say so"); throw new IndexNotFoundException("Can't make it because I say so");

View File

@ -0,0 +1,135 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
public class TransportBulkActionTests extends ESTestCase {
/** Services needed by bulk action */
private TransportService transportService;
private ClusterService clusterService;
private ThreadPool threadPool;
private TestTransportBulkAction bulkAction;
class TestTransportBulkAction extends TransportBulkAction {
boolean indexCreated = false; // set when the "real" index is created
TestTransportBulkAction() {
super(Settings.EMPTY, TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null,
null, new ActionFilters(Collections.emptySet()), new Resolver(Settings.EMPTY),
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(Settings.EMPTY)));
}
@Override
protected boolean needToCheck() {
return true;
}
@Override
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
listener.onResponse(null);
}
}
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("TransportBulkActionTookTests");
clusterService = createClusterService(threadPool);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
bulkAction = new TestTransportBulkAction();
}
@After
public void tearDown() throws Exception {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
clusterService.close();
super.tearDown();
}
public void testDeleteNonExistingDocDoesNotCreateIndex() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest("index", "type", "id"));
bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> {
assertFalse(bulkAction.indexCreated);
BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems();
assertEquals(bulkResponses.length, 1);
assertTrue(bulkResponses[0].isFailed());
assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException);
assertEquals("index", bulkResponses[0].getFailure().getIndex());
}, exception -> {
throw new AssertionError(exception);
}));
}
public void testDeleteNonExistingDocExternalVersionCreatesIndex() throws Exception {
BulkRequest bulkRequest = new BulkRequest()
.add(new DeleteRequest("index", "type", "id").versionType(VersionType.EXTERNAL).version(0));
bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> {
assertTrue(bulkAction.indexCreated);
}, exception -> {
throw new AssertionError(exception);
}));
}
public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exception {
BulkRequest bulkRequest = new BulkRequest()
.add(new DeleteRequest("index2", "type", "id").versionType(VersionType.EXTERNAL_GTE).version(0));
bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> {
assertTrue(bulkAction.indexCreated);
}, exception -> {
throw new AssertionError(exception);
}));
}
}

View File

@ -1091,7 +1091,7 @@ public class TransportReplicationActionTests extends ESTestCase {
count.incrementAndGet(); count.incrementAndGet();
callback.onResponse(count::decrementAndGet); callback.onResponse(count::decrementAndGet);
return null; return null;
}).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
doAnswer(invocation -> { doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0]; long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1]; ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
@ -1103,7 +1103,7 @@ public class TransportReplicationActionTests extends ESTestCase {
count.incrementAndGet(); count.incrementAndGet();
callback.onResponse(count::decrementAndGet); callback.onResponse(count::decrementAndGet);
return null; return null;
}).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state(); final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

View File

@ -444,7 +444,7 @@ public class TransportWriteActionTests extends ESTestCase {
count.incrementAndGet(); count.incrementAndGet();
callback.onResponse(count::decrementAndGet); callback.onResponse(count::decrementAndGet);
return null; return null;
}).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
doAnswer(invocation -> { doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0]; long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1]; ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
@ -456,7 +456,7 @@ public class TransportWriteActionTests extends ESTestCase {
count.incrementAndGet(); count.incrementAndGet();
callback.onResponse(count::decrementAndGet); callback.onResponse(count::decrementAndGet);
return null; return null;
}).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state(); final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

View File

@ -43,12 +43,10 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContextRegistry; import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineRegistry;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects; import org.elasticsearch.test.RandomObjects;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
@ -60,7 +58,6 @@ import java.util.function.Function;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.script.MockScriptEngine.mockInlineScript; import static org.elasticsearch.script.MockScriptEngine.mockInlineScript;
@ -140,14 +137,10 @@ public class UpdateRequestTests extends ESTestCase {
scripts.put("return", vars -> null); scripts.put("return", vars -> null);
final ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(emptyList()); final ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(emptyList());
final MockScriptEngine engine = new MockScriptEngine("mock", scripts); final MockScriptEngine engine = new MockScriptEngine("mock", scripts);
final ScriptEngineRegistry scriptEngineRegistry =
new ScriptEngineRegistry(singletonList(engine));
final ResourceWatcherService watcherService =
new ResourceWatcherService(baseSettings, null);
ScriptService scriptService = new ScriptService( ScriptService scriptService = new ScriptService(
baseSettings, baseSettings,
scriptEngineRegistry, Collections.singletonMap(engine.getType(), engine),
scriptContextRegistry); scriptContextRegistry);
final Settings settings = settings(Version.CURRENT).build(); final Settings settings = settings(Version.CURRENT).build();

View File

@ -18,9 +18,12 @@
*/ */
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -48,10 +51,23 @@ public class ShardStateIT extends ESIntegTestCase {
indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null); indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null);
logger.info("--> waiting for a yellow index"); logger.info("--> waiting for a yellow index");
// JDK 9 type inference gets confused, so we have to help the ensureYellow();
// type inference
assertBusy(((Runnable) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), // this forces the primary term to propagate to the replicas
equalTo(ClusterHealthStatus.YELLOW)))); int id = 0;
while (true) {
// find an ID that routes to the right shard, we will only index to the shard that saw a primary failure
final String idAsString = Integer.toString(id);
final int hash = Math.floorMod(Murmur3HashFunction.hash(idAsString), 2);
if (hash == shard) {
client()
.index(new IndexRequest("test", "type", idAsString).source("{ \"f\": \"" + idAsString + "\"}", XContentType.JSON))
.get();
break;
} else {
id++;
}
}
final long term0 = shard == 0 ? 2 : 1; final long term0 = shard == 0 ? 2 : 1;
final long term1 = shard == 1 ? 2 : 1; final long term1 = shard == 1 ? 2 : 1;
@ -63,13 +79,13 @@ public class ShardStateIT extends ESIntegTestCase {
assertPrimaryTerms(term0, term1); assertPrimaryTerms(term0, term1);
} }
protected void assertPrimaryTerms(long term0, long term1) { protected void assertPrimaryTerms(long shard0Term, long shard1Term) {
for (String node : internalCluster().getNodeNames()) { for (String node : internalCluster().getNodeNames()) {
logger.debug("--> asserting primary terms terms on [{}]", node); logger.debug("--> asserting primary terms terms on [{}]", node);
ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState();
IndexMetaData metaData = state.metaData().index("test"); IndexMetaData metaData = state.metaData().index("test");
assertThat(metaData.primaryTerm(0), equalTo(term0)); assertThat(metaData.primaryTerm(0), equalTo(shard0Term));
assertThat(metaData.primaryTerm(1), equalTo(term1)); assertThat(metaData.primaryTerm(1), equalTo(shard1Term));
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(metaData.getIndex()); IndexService indexService = indicesService.indexService(metaData.getIndex());
if (indexService != null) { if (indexService != null) {

View File

@ -68,7 +68,6 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedInd
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.script.ScriptContextRegistry; import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineRegistry;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ClusterServiceUtils;
@ -78,14 +77,12 @@ import org.elasticsearch.test.TestSearchContext;
import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.test.engine.MockEngineFactory;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
public class IndexModuleTests extends ESTestCase { public class IndexModuleTests extends ESTestCase {
@ -128,9 +125,8 @@ public class IndexModuleTests extends ESTestCase {
threadPool = new TestThreadPool("test"); threadPool = new TestThreadPool("test");
circuitBreakerService = new NoneCircuitBreakerService(); circuitBreakerService = new NoneCircuitBreakerService();
bigArrays = new BigArrays(settings, circuitBreakerService); bigArrays = new BigArrays(settings, circuitBreakerService);
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(emptyList());
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
scriptService = new ScriptService(settings, scriptEngineRegistry, scriptContextRegistry); scriptService = new ScriptService(settings, Collections.emptyMap(), scriptContextRegistry);
clusterService = ClusterServiceUtils.createClusterService(threadPool); clusterService = ClusterServiceUtils.createClusterService(threadPool);
nodeEnvironment = new NodeEnvironment(settings, environment); nodeEnvironment = new NodeEnvironment(settings, environment);
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();

View File

@ -47,6 +47,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -59,6 +61,7 @@ import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
import java.io.IOException; import java.io.IOException;
@ -225,7 +228,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
assert shardRoutings().stream() assert shardRoutings().stream()
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; "replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
replica.updatePrimaryTerm(primary.getPrimaryTerm());
replicas.add(replica); replicas.add(replica);
updateAllocationIDsOnPrimary(); updateAllocationIDsOnPrimary();
} }
@ -254,17 +256,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
*/ */
public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException { public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException {
final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1; final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1;
IndexMetaData.Builder newMetaData = IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
indexMetaData = newMetaData.build(); indexMetaData = newMetaData.build();
for (IndexShard shard: replicas) { assertTrue(replicas.remove(replica));
shard.updatePrimaryTerm(newTerm);
}
boolean found = replicas.remove(replica);
assert found;
closeShards(primary); closeShards(primary);
primary = replica; primary = replica;
replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
primary.updatePrimaryTerm(newTerm);
updateAllocationIDsOnPrimary(); updateAllocationIDsOnPrimary();
} }
@ -476,17 +474,34 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final ReplicaRequest request, final ReplicaRequest request,
final long globalCheckpoint, final long globalCheckpoint,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) { final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
try {
IndexShard replica = replicationGroup.replicas.stream() IndexShard replica = replicationGroup.replicas.stream()
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
replica.acquireReplicaOperationPermit(
request.primaryTerm(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
try {
replica.updateGlobalCheckpointOnReplica(globalCheckpoint); replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
performOnReplica(request, replica); performOnReplica(request, replica);
listener.onResponse(new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint())); releasable.close();
} catch (Exception e) { listener.onResponse(
new ReplicaResponse(
replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable);
listener.onFailure(e); listener.onFailure(e);
} }
} }
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
},
ThreadPool.Names.INDEX);
}
@Override @Override
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,

View File

@ -37,16 +37,15 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
public class IndexShardOperationsLockTests extends ESTestCase { public class IndexShardOperationPermitsTests extends ESTestCase {
private static ThreadPool threadPool; private static ThreadPool threadPool;
private IndexShardOperationsLock block; private IndexShardOperationPermits permits;
@BeforeClass @BeforeClass
public static void setupThreadPool() { public static void setupThreadPool() {
@ -61,13 +60,13 @@ public class IndexShardOperationsLockTests extends ESTestCase {
@Before @Before
public void createIndexShardOperationsLock() { public void createIndexShardOperationsLock() {
block = new IndexShardOperationsLock(new ShardId("blubb", "id", 0), logger, threadPool); permits = new IndexShardOperationPermits(new ShardId("blubb", "id", 0), logger, threadPool);
} }
@After @After
public void checkNoInflightOperations() { public void checkNoInflightOperations() {
assertThat(block.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE)); assertThat(permits.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE));
assertThat(block.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperationsCount(), equalTo(0));
} }
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException { public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
@ -87,7 +86,7 @@ public class IndexShardOperationsLockTests extends ESTestCase {
Thread thread = new Thread() { Thread thread = new Thread() {
public void run() { public void run() {
latch.countDown(); latch.countDown();
block.acquire(future, ThreadPool.Names.GENERIC, true); permits.acquire(future, ThreadPool.Names.GENERIC, true);
} }
}; };
futures.add(future); futures.add(future);
@ -123,29 +122,29 @@ public class IndexShardOperationsLockTests extends ESTestCase {
public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException { public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>(); PlainActionFuture<Releasable> future = new PlainActionFuture<>();
block.acquire(future, ThreadPool.Names.GENERIC, true); permits.acquire(future, ThreadPool.Names.GENERIC, true);
assertTrue(future.isDone()); assertTrue(future.isDone());
future.get().close(); future.get().close();
} }
public void testOperationsIfClosed() throws ExecutionException, InterruptedException { public void testOperationsIfClosed() throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>(); PlainActionFuture<Releasable> future = new PlainActionFuture<>();
block.close(); permits.close();
block.acquire(future, ThreadPool.Names.GENERIC, true); permits.acquire(future, ThreadPool.Names.GENERIC, true);
ExecutionException exception = expectThrows(ExecutionException.class, future::get); ExecutionException exception = expectThrows(ExecutionException.class, future::get);
assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class)); assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class));
} }
public void testBlockIfClosed() throws ExecutionException, InterruptedException { public void testBlockIfClosed() throws ExecutionException, InterruptedException {
block.close(); permits.close();
expectThrows(IndexShardClosedException.class, () -> block.blockOperations(randomInt(10), TimeUnit.MINUTES, expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
() -> { throw new IllegalArgumentException("fake error"); })); () -> { throw new IllegalArgumentException("fake error"); }));
} }
public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>(); PlainActionFuture<Releasable> future = new PlainActionFuture<>();
try (Releasable releasable = blockAndWait()) { try (Releasable releasable = blockAndWait()) {
block.acquire(future, ThreadPool.Names.GENERIC, true); permits.acquire(future, ThreadPool.Names.GENERIC, true);
assertFalse(future.isDone()); assertFalse(future.isDone());
} }
future.get(1, TimeUnit.HOURS).close(); future.get(1, TimeUnit.HOURS).close();
@ -192,8 +191,8 @@ public class IndexShardOperationsLockTests extends ESTestCase {
context.putHeader("foo", "bar"); context.putHeader("foo", "bar");
context.putTransient("bar", "baz"); context.putTransient("bar", "baz");
// test both with and without a executor name // test both with and without a executor name
block.acquire(future, ThreadPool.Names.GENERIC, true); permits.acquire(future, ThreadPool.Names.GENERIC, true);
block.acquire(future2, null, true); permits.acquire(future2, null, true);
} }
assertFalse(future.isDone()); assertFalse(future.isDone());
} }
@ -209,7 +208,7 @@ public class IndexShardOperationsLockTests extends ESTestCase {
IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0)); IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0));
threadPool.generic().execute(() -> { threadPool.generic().execute(() -> {
try { try {
block.blockOperations(1, TimeUnit.MINUTES, () -> { permits.blockOperations(1, TimeUnit.MINUTES, () -> {
try { try {
blockAcquired.countDown(); blockAcquired.countDown();
releaseBlock.await(); releaseBlock.await();
@ -241,31 +240,31 @@ public class IndexShardOperationsLockTests extends ESTestCase {
public void testActiveOperationsCount() throws ExecutionException, InterruptedException { public void testActiveOperationsCount() throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> future1 = new PlainActionFuture<>(); PlainActionFuture<Releasable> future1 = new PlainActionFuture<>();
block.acquire(future1, ThreadPool.Names.GENERIC, true); permits.acquire(future1, ThreadPool.Names.GENERIC, true);
assertTrue(future1.isDone()); assertTrue(future1.isDone());
assertThat(block.getActiveOperationsCount(), equalTo(1)); assertThat(permits.getActiveOperationsCount(), equalTo(1));
PlainActionFuture<Releasable> future2 = new PlainActionFuture<>(); PlainActionFuture<Releasable> future2 = new PlainActionFuture<>();
block.acquire(future2, ThreadPool.Names.GENERIC, true); permits.acquire(future2, ThreadPool.Names.GENERIC, true);
assertTrue(future2.isDone()); assertTrue(future2.isDone());
assertThat(block.getActiveOperationsCount(), equalTo(2)); assertThat(permits.getActiveOperationsCount(), equalTo(2));
future1.get().close(); future1.get().close();
assertThat(block.getActiveOperationsCount(), equalTo(1)); assertThat(permits.getActiveOperationsCount(), equalTo(1));
future1.get().close(); // check idempotence future1.get().close(); // check idempotence
assertThat(block.getActiveOperationsCount(), equalTo(1)); assertThat(permits.getActiveOperationsCount(), equalTo(1));
future2.get().close(); future2.get().close();
assertThat(block.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperationsCount(), equalTo(0));
try (Releasable releasable = blockAndWait()) { try (Releasable releasable = blockAndWait()) {
assertThat(block.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperationsCount(), equalTo(0));
} }
PlainActionFuture<Releasable> future3 = new PlainActionFuture<>(); PlainActionFuture<Releasable> future3 = new PlainActionFuture<>();
block.acquire(future3, ThreadPool.Names.GENERIC, true); permits.acquire(future3, ThreadPool.Names.GENERIC, true);
assertTrue(future3.isDone()); assertTrue(future3.isDone());
assertThat(block.getActiveOperationsCount(), equalTo(1)); assertThat(permits.getActiveOperationsCount(), equalTo(1));
future3.get().close(); future3.get().close();
assertThat(block.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperationsCount(), equalTo(0));
} }
} }

View File

@ -33,6 +33,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStats;
@ -118,8 +119,12 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -130,11 +135,13 @@ import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -262,20 +269,20 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard); closeShards(indexShard);
assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
try { try {
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX);
fail("we should not be able to increment anymore"); fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) { } catch (IndexShardClosedException e) {
// expected // expected
} }
try { try {
indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX); indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX);
fail("we should not be able to increment anymore"); fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) { } catch (IndexShardClosedException e) {
// expected // expected
} }
} }
public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
final ShardId shardId = new ShardId("test", "_na_", 0); final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard; final IndexShard indexShard;
@ -287,10 +294,10 @@ public class IndexShardTests extends IndexShardTestCase {
// simulate promotion // simulate promotion
indexShard = newStartedShard(false); indexShard = newStartedShard(false);
ShardRouting replicaRouting = indexShard.routingEntry(); ShardRouting replicaRouting = indexShard.routingEntry();
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId()); true, ShardRoutingState.STARTED, replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting); indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
} else { } else {
indexShard = newStartedShard(true); indexShard = newStartedShard(true);
} }
@ -298,15 +305,15 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, indexShard.getActiveOperationsCount()); assertEquals(0, indexShard.getActiveOperationsCount());
if (indexShard.routingEntry().isRelocationTarget() == false) { if (indexShard.routingEntry().isRelocationTarget() == false) {
try { try {
indexShard.acquireReplicaOperationLock(primaryTerm, null, ThreadPool.Names.INDEX); indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX);
fail("shard shouldn't accept operations as replica"); fail("shard shouldn't accept operations as replica");
} catch (IllegalStateException ignored) { } catch (IllegalStateException ignored) {
} }
} }
Releasable operation1 = acquirePrimaryOperationLockBlockingly(indexShard); Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(1, indexShard.getActiveOperationsCount()); assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquirePrimaryOperationLockBlockingly(indexShard); Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(2, indexShard.getActiveOperationsCount()); assertEquals(2, indexShard.getActiveOperationsCount());
Releasables.close(operation1, operation2); Releasables.close(operation1, operation2);
@ -315,20 +322,20 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard); closeShards(indexShard);
} }
private Releasable acquirePrimaryOperationLockBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>(); PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
indexShard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX); indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX);
return fut.get(); return fut.get();
} }
private Releasable acquireReplicaOperationLockBlockingly(IndexShard indexShard, long opPrimaryTerm) private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>(); PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
indexShard.acquireReplicaOperationLock(opPrimaryTerm, fut, ThreadPool.Names.INDEX); indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX);
return fut.get(); return fut.get();
} }
public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException { public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException {
final ShardId shardId = new ShardId("test", "_na_", 0); final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard; final IndexShard indexShard;
@ -367,33 +374,165 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, indexShard.getActiveOperationsCount()); assertEquals(0, indexShard.getActiveOperationsCount());
if (shardRouting.primary() == false) { if (shardRouting.primary() == false) {
try { final IllegalStateException e =
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX));
fail("shard shouldn't accept primary ops"); assertThat(e, hasToString(containsString("shard is not a primary")));
} catch (IllegalStateException ignored) {
}
} }
final long primaryTerm = indexShard.getPrimaryTerm(); final long primaryTerm = indexShard.getPrimaryTerm();
Releasable operation1 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
assertEquals(1, indexShard.getActiveOperationsCount()); assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
assertEquals(2, indexShard.getActiveOperationsCount()); assertEquals(2, indexShard.getActiveOperationsCount());
try { {
indexShard.acquireReplicaOperationLock(primaryTerm - 1, null, ThreadPool.Names.INDEX); final AtomicBoolean onResponse = new AtomicBoolean();
fail("you can not increment the operation counter with an older primary term"); final AtomicBoolean onFailure = new AtomicBoolean();
} catch (IllegalArgumentException e) { final AtomicReference<Exception> onFailureException = new AtomicReference<>();
assertThat(e.getMessage(), containsString("operation term")); ActionListener<Releasable> onLockAcquired = new ActionListener<Releasable>() {
assertThat(e.getMessage(), containsString("too old")); @Override
public void onResponse(Releasable releasable) {
onResponse.set(true);
} }
// but you can increment with a newer one.. @Override
acquireReplicaOperationLockBlockingly(indexShard, primaryTerm + 1 + randomInt(20)).close(); public void onFailure(Exception e) {
Releasables.close(operation1, operation2); onFailure.set(true);
onFailureException.set(e);
}
};
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX);
assertFalse(onResponse.get());
assertTrue(onFailure.get());
assertThat(onFailureException.get(), instanceOf(IllegalStateException.class));
assertThat(
onFailureException.get(), hasToString(containsString("operation primary term [" + (primaryTerm - 1) + "] is too old")));
}
{
final AtomicBoolean onResponse = new AtomicBoolean();
final AtomicBoolean onFailure = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(2);
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
indexShard.acquireReplicaOperationPermit(
primaryTerm + 1 + randomInt(20),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
onResponse.set(true);
releasable.close();
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onFailure(Exception e) {
onFailure.set(true);
}
},
ThreadPool.Names.SAME);
});
thread.start();
barrier.await();
// our operation should be blocked until the previous operations complete
assertFalse(onResponse.get());
assertFalse(onFailure.get());
Releasables.close(operation1);
// our operation should still be blocked
assertFalse(onResponse.get());
assertFalse(onFailure.get());
Releasables.close(operation2);
barrier.await();
// now lock acquisition should have succeeded
assertTrue(onResponse.get());
assertFalse(onFailure.get());
thread.join();
assertEquals(0, indexShard.getActiveOperationsCount()); assertEquals(0, indexShard.getActiveOperationsCount());
}
closeShards(indexShard);
}
public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);
final CyclicBarrier barrier = new CyclicBarrier(3);
final CountDownLatch latch = new CountDownLatch(2);
final long primaryTerm = indexShard.getPrimaryTerm();
final AtomicLong counter = new AtomicLong();
final AtomicReference<Exception> onFailure = new AtomicReference<>();
final LongFunction<Runnable> function = increment -> () -> {
assert increment > 0;
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
indexShard.acquireReplicaOperationPermit(
primaryTerm + increment,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
counter.incrementAndGet();
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + increment));
latch.countDown();
releasable.close();
}
@Override
public void onFailure(Exception e) {
onFailure.set(e);
latch.countDown();
}
},
ThreadPool.Names.INDEX);
};
final long firstIncrement = 1 + (randomBoolean() ? 0 : 1);
final long secondIncrement = 1 + (randomBoolean() ? 0 : 1);
final Thread first = new Thread(function.apply(firstIncrement));
final Thread second = new Thread(function.apply(secondIncrement));
first.start();
second.start();
// the two threads synchronize attempting to acquire an operation permit
barrier.await();
// we wait for both operations to complete
latch.await();
first.join();
second.join();
final Exception e;
if ((e = onFailure.get()) != null) {
/*
* If one thread tried to set the primary term to a higher value than the other thread and the thread with the higher term won
* the race, then the other thread lost the race and only one operation should have been executed.
*/
assertThat(e, instanceOf(IllegalStateException.class));
assertThat(e, hasToString(matches("operation primary term \\[\\d+\\] is too old")));
assertThat(counter.get(), equalTo(1L));
} else {
assertThat(counter.get(), equalTo(2L));
}
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + Math.max(firstIncrement, secondIncrement)));
closeShards(indexShard); closeShards(indexShard);
} }
@ -701,7 +840,7 @@ public class IndexShardTests extends IndexShardTestCase {
} }
}); });
try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) { try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
// start finalization of recovery // start finalization of recovery
recoveryThread.start(); recoveryThread.start();
latch.await(); latch.await();
@ -711,7 +850,7 @@ public class IndexShardTests extends IndexShardTestCase {
// recovery can be now finalized // recovery can be now finalized
recoveryThread.join(); recoveryThread.join();
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) { try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
// lock can again be acquired // lock can again be acquired
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
} }
@ -740,7 +879,7 @@ public class IndexShardTests extends IndexShardTestCase {
super.onResponse(releasable); super.onResponse(releasable);
} }
}; };
shard.acquirePrimaryOperationLock(onLockAcquired, ThreadPool.Names.INDEX); shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX);
onLockAcquiredActions.add(onLockAcquired); onLockAcquiredActions.add(onLockAcquired);
} }
@ -764,7 +903,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexThreads[i] = new Thread() { indexThreads[i] = new Thread() {
@Override @Override
public void run() { public void run() {
try (Releasable operationLock = acquirePrimaryOperationLockBlockingly(shard)) { try (Releasable operationLock = acquirePrimaryOperationPermitBlockingly(shard)) {
allPrimaryOperationLocksAcquired.countDown(); allPrimaryOperationLocksAcquired.countDown();
barrier.await(); barrier.await();
} catch (InterruptedException | BrokenBarrierException | ExecutionException e) { } catch (InterruptedException | BrokenBarrierException | ExecutionException e) {

View File

@ -362,6 +362,11 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
this.shardRouting = shardRouting; this.shardRouting = shardRouting;
} }
@Override
public void updatePrimaryTerm(long primaryTerm) {
term = primaryTerm;
}
@Override @Override
public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) { public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
this.activeAllocationIds = activeAllocationIds; this.activeAllocationIds = activeAllocationIds;

View File

@ -114,7 +114,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId(); final ShardId shardId = shard.shardId();
PlainActionFuture<Releasable> fut = new PlainActionFuture<>(); PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
shard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX); shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX);
try (Releasable operationLock = fut.get()) { try (Releasable operationLock = fut.get()) {
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>(); SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.attemptSyncedFlush(shardId, listener); flushService.attemptSyncedFlush(shardId, listener);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.test.ESTestCase;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
public class ScriptContextTests extends ESTestCase { public class ScriptContextTests extends ESTestCase {
@ -42,13 +43,13 @@ public class ScriptContextTests extends ESTestCase {
.build(); .build();
MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, Collections.singletonMap("1", script -> "1")); MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, Collections.singletonMap("1", script -> "1"));
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(scriptEngine)); Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
List<ScriptContext.Plugin> customContexts = Arrays.asList( List<ScriptContext.Plugin> customContexts = Arrays.asList(
new ScriptContext.Plugin(PLUGIN_NAME, "custom_op"), new ScriptContext.Plugin(PLUGIN_NAME, "custom_op"),
new ScriptContext.Plugin(PLUGIN_NAME, "custom_exp_disabled_op"), new ScriptContext.Plugin(PLUGIN_NAME, "custom_exp_disabled_op"),
new ScriptContext.Plugin(PLUGIN_NAME, "custom_globally_disabled_op")); new ScriptContext.Plugin(PLUGIN_NAME, "custom_globally_disabled_op"));
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customContexts); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customContexts);
ScriptService scriptService = new ScriptService(settings, scriptEngineRegistry, scriptContextRegistry); ScriptService scriptService = new ScriptService(settings, engines, scriptContextRegistry);
ClusterState empty = ClusterState.builder(new ClusterName("_name")).build(); ClusterState empty = ClusterState.builder(new ClusterName("_name")).build();
ScriptMetaData smd = empty.metaData().custom(ScriptMetaData.TYPE); ScriptMetaData smd = empty.metaData().custom(ScriptMetaData.TYPE);

View File

@ -54,8 +54,7 @@ public class ScriptServiceTests extends ESTestCase {
private ScriptEngine scriptEngine; private ScriptEngine scriptEngine;
private ScriptEngine dangerousScriptEngine; private ScriptEngine dangerousScriptEngine;
private Map<String, ScriptEngine> scriptEnginesByLangMap; private Map<String, ScriptEngine> engines;
private ScriptEngineRegistry scriptEngineRegistry;
private ScriptContextRegistry scriptContextRegistry; private ScriptContextRegistry scriptContextRegistry;
private ScriptContext[] scriptContexts; private ScriptContext[] scriptContexts;
private ScriptService scriptService; private ScriptService scriptService;
@ -77,7 +76,6 @@ public class ScriptServiceTests extends ESTestCase {
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 10000) .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 10000)
.build(); .build();
scriptEngine = new TestEngine(); scriptEngine = new TestEngine();
dangerousScriptEngine = new TestDangerousEngine();
TestEngine defaultScriptServiceEngine = new TestEngine(Script.DEFAULT_SCRIPT_LANG) {}; TestEngine defaultScriptServiceEngine = new TestEngine(Script.DEFAULT_SCRIPT_LANG) {};
//randomly register custom script contexts //randomly register custom script contexts
int randomInt = randomIntBetween(0, 3); int randomInt = randomIntBetween(0, 3);
@ -95,8 +93,9 @@ public class ScriptServiceTests extends ESTestCase {
String context = plugin + "_" + operation; String context = plugin + "_" + operation;
contexts.put(context, new ScriptContext.Plugin(plugin, operation)); contexts.put(context, new ScriptContext.Plugin(plugin, operation));
} }
scriptEngineRegistry = new ScriptEngineRegistry(Arrays.asList(scriptEngine, dangerousScriptEngine, engines = new HashMap<>();
defaultScriptServiceEngine)); engines.put(scriptEngine.getType(), scriptEngine);
engines.put(defaultScriptServiceEngine.getType(), defaultScriptServiceEngine);
scriptContextRegistry = new ScriptContextRegistry(contexts.values()); scriptContextRegistry = new ScriptContextRegistry(contexts.values());
scriptContexts = scriptContextRegistry.scriptContexts().toArray(new ScriptContext[scriptContextRegistry.scriptContexts().size()]); scriptContexts = scriptContextRegistry.scriptContexts().toArray(new ScriptContext[scriptContextRegistry.scriptContexts().size()]);
logger.info("--> setup script service"); logger.info("--> setup script service");
@ -104,7 +103,7 @@ public class ScriptServiceTests extends ESTestCase {
private void buildScriptService(Settings additionalSettings) throws IOException { private void buildScriptService(Settings additionalSettings) throws IOException {
Settings finalSettings = Settings.builder().put(baseSettings).put(additionalSettings).build(); Settings finalSettings = Settings.builder().put(baseSettings).put(additionalSettings).build();
scriptService = new ScriptService(finalSettings, scriptEngineRegistry, scriptContextRegistry) { scriptService = new ScriptService(finalSettings, engines, scriptContextRegistry) {
@Override @Override
StoredScriptSource getScriptFromClusterState(String id, String lang) { StoredScriptSource getScriptFromClusterState(String id, String lang) {
//mock the script that gets retrieved from an index //mock the script that gets retrieved from an index
@ -245,7 +244,9 @@ public class ScriptServiceTests extends ESTestCase {
public void testSearchCountedInCompilationStats() throws IOException { public void testSearchCountedInCompilationStats() throws IOException {
buildScriptService(Settings.EMPTY); buildScriptService(Settings.EMPTY);
scriptService.search(null, new Script(ScriptType.INLINE, "test", "1+1", Collections.emptyMap()), randomFrom(scriptContexts)); Script script = new Script(ScriptType.INLINE, "test", "1+1", Collections.emptyMap());
CompiledScript compile = scriptService.compile(script, randomFrom(scriptContexts));
scriptService.search(null, compile, script.getParams());
assertEquals(1L, scriptService.stats().getCompilations()); assertEquals(1L, scriptService.stats().getCompilations());
} }
@ -388,39 +389,5 @@ public class ScriptServiceTests extends ESTestCase {
public void close() { public void close() {
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
}
public static class TestDangerousEngine implements ScriptEngine {
public static final String NAME = "dtest";
@Override
public String getType() {
return NAME;
}
@Override
public Object compile(String scriptName, String scriptSource, Map<String, String> params) {
return "compiled_" + scriptSource;
}
@Override
public ExecutableScript executable(final CompiledScript compiledScript, @Nullable Map<String, Object> vars) {
return null;
}
@Override
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map<String, Object> vars) {
return null;
}
@Override
public void close() {
}
} }
} }

View File

@ -588,10 +588,5 @@ public class AvgIT extends AbstractNumericTestCase {
} }
}; };
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
} }

View File

@ -491,11 +491,6 @@ public class SumIT extends AbstractNumericTestCase {
} }
}; };
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
/** /**
@ -597,10 +592,5 @@ public class SumIT extends AbstractNumericTestCase {
} }
}; };
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
} }

View File

@ -344,10 +344,5 @@ public class ValueCountIT extends ESIntegTestCase {
} }
}; };
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
} }

View File

@ -23,11 +23,9 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContextRegistry; import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineRegistry;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.ParsedAggregation;
@ -119,10 +117,9 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
Collections.singletonMap(REDUCE_SCRIPT_NAME, script -> { Collections.singletonMap(REDUCE_SCRIPT_NAME, script -> {
return ((List<Object>) script.get("_aggs")).size(); return ((List<Object>) script.get("_aggs")).size();
})); }));
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(scriptEngine));
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
try { try {
return new ScriptService(Settings.EMPTY, scriptEngineRegistry, scriptContextRegistry); return new ScriptService(Settings.EMPTY, Collections.singletonMap(scriptEngine.getType(), scriptEngine), scriptContextRegistry);
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException(e); throw new ElasticsearchException(e);
} }

View File

@ -27,7 +27,6 @@ import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.QueryShardContext;
@ -36,7 +35,7 @@ import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.ScoreAccessor; import org.elasticsearch.script.ScoreAccessor;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContextRegistry; import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineRegistry; import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.AggregatorTestCase;
@ -198,11 +197,11 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
protected QueryShardContext queryShardContextMock(MapperService mapperService, final MappedFieldType[] fieldTypes, protected QueryShardContext queryShardContextMock(MapperService mapperService, final MappedFieldType[] fieldTypes,
CircuitBreakerService circuitBreakerService) { CircuitBreakerService circuitBreakerService) {
MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, SCRIPTS); MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, SCRIPTS);
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(scriptEngine)); Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
ScriptService scriptService; ScriptService scriptService;
try { try {
scriptService = new ScriptService(Settings.EMPTY, scriptEngineRegistry, scriptContextRegistry); scriptService = new ScriptService(Settings.EMPTY, engines, scriptContextRegistry);
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException(e); throw new ElasticsearchException(e);
} }

View File

@ -54,7 +54,7 @@ import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptContextRegistry; import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineRegistry; import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptServiceTests.TestEngine; import org.elasticsearch.script.ScriptServiceTests.TestEngine;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
@ -62,7 +62,6 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -89,8 +88,8 @@ public abstract class AbstractSortTestCase<T extends SortBuilder<T>> extends EST
.put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder) .put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder)
.build(); .build();
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new TestEngine())); ScriptEngine engine = new TestEngine();
scriptService = new ScriptService(baseSettings, scriptEngineRegistry, scriptContextRegistry) { scriptService = new ScriptService(baseSettings, Collections.singletonMap(engine.getType(), engine), scriptContextRegistry) {
@Override @Override
public CompiledScript compile(Script script, ScriptContext scriptContext) { public CompiledScript compile(Script script, ScriptContext scriptContext) {
return new CompiledScript(ScriptType.INLINE, "mockName", "test", script); return new CompiledScript(ScriptType.INLINE, "mockName", "test", script);

View File

@ -1059,11 +1059,6 @@ public class SuggestSearchIT extends ESIntegTestCase {
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) {
throw new UnsupportedOperationException("search script not supported"); throw new UnsupportedOperationException("search script not supported");
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
public void testPhraseSuggesterCollate() throws InterruptedException, ExecutionException, IOException { public void testPhraseSuggesterCollate() throws InterruptedException, ExecutionException, IOException {

View File

@ -132,11 +132,6 @@ public class UpdateIT extends ESIntegTestCase {
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
public static class FieldIncrementScriptPlugin extends Plugin implements ScriptPlugin { public static class FieldIncrementScriptPlugin extends Plugin implements ScriptPlugin {
@ -193,11 +188,6 @@ public class UpdateIT extends ESIntegTestCase {
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
public static class ScriptedUpsertScriptPlugin extends Plugin implements ScriptPlugin { public static class ScriptedUpsertScriptPlugin extends Plugin implements ScriptPlugin {
@ -254,12 +244,6 @@ public class UpdateIT extends ESIntegTestCase {
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
public static class ExtractContextInSourceScriptPlugin extends Plugin implements ScriptPlugin { public static class ExtractContextInSourceScriptPlugin extends Plugin implements ScriptPlugin {
@ -317,11 +301,6 @@ public class UpdateIT extends ESIntegTestCase {
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) { public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }
@Override @Override

View File

@ -105,7 +105,8 @@ thrown instead.
[[delete-index-creation]] [[delete-index-creation]]
=== Automatic index creation === Automatic index creation
The delete operation automatically creates an index if it has not been If an <<docs-index_,external versioning variant>> is used,
the delete operation automatically creates an index if it has not been
created before (check out the <<indices-create-index,create index API>> created before (check out the <<indices-create-index,create index API>>
for manually creating an index), and also automatically creates a for manually creating an index), and also automatically creates a
dynamic type mapping for the specific type if it has not been created dynamic type mapping for the specific type if it has not been created

View File

@ -44,3 +44,9 @@ The default value of the `allow_no_indices` option for the Open/Close index API
has been changed from `false` to `true` so it is aligned with the behaviour of the has been changed from `false` to `true` so it is aligned with the behaviour of the
Delete index API. As a result, Open/Close index API don't return an error by Delete index API. As a result, Open/Close index API don't return an error by
default when a provided wildcard expression doesn't match any closed/open index. default when a provided wildcard expression doesn't match any closed/open index.
==== Delete a document
Delete a document from non-existing index has been modified to not create the index.
However if an external versioning is used the index will be created and the document
will be marked for deletion.

View File

@ -26,6 +26,6 @@ now disallowed for these indices' mappings.
==== Unrecognized `match_mapping_type` options not silently ignored ==== Unrecognized `match_mapping_type` options not silently ignored
Previously Elastiscearch would silently ignore any dynamic templates that Previously Elasticsearch would silently ignore any dynamic templates that
included a `match_mapping_type` type that was unrecognized. An exception is now included a `match_mapping_type` type that was unrecognized. An exception is now
thrown on an unrecognized type. thrown on an unrecognized type.

View File

@ -94,6 +94,8 @@ thread_pool:
[float] [float]
==== `fixed_auto_queue_size` ==== `fixed_auto_queue_size`
experimental[]
The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle
the requests with a bounded queue for pending requests that have no threads to the requests with a bounded queue for pending requests that have no threads to
service them. It's similar to the `fixed` threadpool, however, the `queue_size` service them. It's similar to the `fixed` threadpool, however, the `queue_size`

View File

@ -255,9 +255,4 @@ public class ExpressionScriptEngine extends AbstractComponent implements ScriptE
@Override @Override
public void close() {} public void close() {}
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }

View File

@ -142,9 +142,4 @@ public final class MustacheScriptEngine implements ScriptEngine {
return writer.toString(); return writer.toString();
} }
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }

View File

@ -264,9 +264,4 @@ public final class PainlessScriptEngine extends AbstractComponent implements Scr
private int getNextStatement(String scriptSource, int offset) { private int getNextStatement(String scriptSource, int offset) {
return Math.min(scriptSource.length(), offset + 25); return Math.min(scriptSource.length(), offset + 25);
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
} }

View File

@ -282,7 +282,7 @@ public class PercolatorFieldMapper extends FieldMapper {
); );
verifyQuery(queryBuilder); verifyQuery(queryBuilder);
// Fetching of terms, shapes and indexed scripts happen during this rewrite: // Fetching of terms, shapes and indexed scripts happen during this rewrite:
queryBuilder = queryBuilder.rewrite(queryShardContext); queryBuilder = QueryBuilder.rewriteQuery(queryBuilder, queryShardContext);
try (XContentBuilder builder = XContentFactory.contentBuilder(QUERY_BUILDER_CONTENT_TYPE)) { try (XContentBuilder builder = XContentFactory.contentBuilder(QUERY_BUILDER_CONTENT_TYPE)) {
queryBuilder.toXContent(builder, new MapParams(Collections.emptyMap())); queryBuilder.toXContent(builder, new MapParams(Collections.emptyMap()));

View File

@ -24,7 +24,7 @@ esplugin {
dependencies { dependencies {
compile "org.apache.lucene:lucene-analyzers-icu:${versions.lucene}" compile "org.apache.lucene:lucene-analyzers-icu:${versions.lucene}"
compile 'com.ibm.icu:icu4j:54.1' compile 'com.ibm.icu:icu4j:56.1'
} }
dependencyLicenses { dependencyLicenses {

View File

@ -1 +0,0 @@
3f66ecd5871467598bc81662817b80612a0a907f

View File

@ -0,0 +1 @@
8dd6671f52165a0419e6de5e1016400875a90fa9

View File

@ -133,11 +133,6 @@ public class ExpertScriptPlugin extends Plugin implements ScriptPlugin {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
@Override @Override
public void close() {} public void close() {}
} }

View File

@ -21,13 +21,14 @@ package org.elasticsearch.ingest;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.ScriptContextRegistry; import org.elasticsearch.script.ScriptContextRegistry;
import org.elasticsearch.script.ScriptEngineRegistry; import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.mustache.MustacheScriptEngine; import org.elasticsearch.script.mustache.MustacheScriptEngine;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
public abstract class AbstractScriptTestCase extends ESTestCase { public abstract class AbstractScriptTestCase extends ESTestCase {
@ -35,9 +36,10 @@ public abstract class AbstractScriptTestCase extends ESTestCase {
@Before @Before
public void init() throws Exception { public void init() throws Exception {
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new MustacheScriptEngine())); MustacheScriptEngine engine = new MustacheScriptEngine();
Map<String, ScriptEngine> engines = Collections.singletonMap(engine.getType(), engine);
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList()); ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
ScriptService scriptService = new ScriptService(Settings.EMPTY, scriptEngineRegistry, scriptContextRegistry); ScriptService scriptService = new ScriptService(Settings.EMPTY, engines, scriptContextRegistry);
templateService = new InternalTemplateService(scriptService); templateService = new InternalTemplateService(scriptService);
} }

View File

@ -94,12 +94,6 @@ public class MockScriptEngine implements ScriptEngine {
public void close() throws IOException { public void close() throws IOException {
} }
@Override
public boolean isInlineScriptEnabled() {
return true;
}
public class MockCompiledScript { public class MockCompiledScript {
private final String name; private final String name;