Scripting: split out compile limits and caching (#52498) (#52652)

Phase 1 of adding compilation limits per context.
* Refactor rate limiting and caching into separate class,
  `ScriptCache`,  which will be used per context.
* Disable compilation limit for certain tests.

Backport of 0866031
Refs: #50152
This commit is contained in:
Stuart Tettemer 2020-02-21 12:10:51 -07:00 committed by GitHub
parent 56efd8b44d
commit 376932a47d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 415 additions and 221 deletions

View File

@ -144,6 +144,8 @@ import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
@ -345,6 +347,7 @@ public class Node implements Closeable {
client = new NodeClient(settings, threadPool);
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
@ -357,20 +360,20 @@ public class Node implements Closeable {
final SettingsModule settingsModule =
new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptModule.getScriptService());
clusterService.addStateApplier(scriptService);
resourcesToClose.add(clusterService);
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings())
.newHashPublisher());
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
scriptService, analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class), client);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
@ -449,7 +452,7 @@ public class Node implements Closeable {
final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories);
final AliasValidator aliasValidator = new AliasValidator();
@ -469,7 +472,7 @@ public class Node implements Closeable {
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
scriptService, xContentRegistry, environment, nodeEnvironment,
namedWriteableRegistry, clusterModule.getIndexNameExpressionResolver()).stream())
.collect(Collectors.toList());
@ -528,12 +531,12 @@ public class Node implements Closeable {
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService);
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService);
final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
responseCollectorService, circuitBreakerService);
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
@ -563,7 +566,7 @@ public class Node implements Closeable {
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(ScriptService.class).toInstance(scriptService);
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(UsageService.class).toInstance(usageService);
@ -580,7 +583,7 @@ public class Node implements Closeable {
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService()));
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
@ -1042,6 +1045,13 @@ public class Node implements Closeable {
scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
}
/**
* Creates a new the ScriptService. This method can be overwritten by tests to inject mock implementations.
*/
protected ScriptService newScriptService(Settings settings, Map<String, ScriptEngine> engines, Map<String, ScriptContext<?>> contexts) {
return new ScriptService(settings, engines, contexts);
}
/**
* Get Custom Name Resolvers list based on a Discovery Plugins list
* @param discoveryPlugins Discovery plugins list

View File

@ -0,0 +1,252 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Map;
import java.util.Objects;
/**
* Script cache and compilation rate limiter.
*/
public class ScriptCache {
private static final Logger logger = LogManager.getLogger(ScriptService.class);
private Cache<CacheKey, Object> cache;
private final ScriptMetrics scriptMetrics = new ScriptMetrics();
private final Object lock = new Object();
private Tuple<Integer, TimeValue> rate;
private long lastInlineCompileTime;
private double scriptsPerTimeWindow;
private double compilesAllowedPerNano;
// Cache settings
private int cacheSize;
private TimeValue cacheExpire;
public ScriptCache(
int cacheMaxSize,
TimeValue cacheExpire,
Tuple<Integer, TimeValue> maxCompilationRate
) {
CacheBuilder<CacheKey, Object> cacheBuilder = CacheBuilder.builder();
if (cacheMaxSize >= 0) {
cacheBuilder.setMaximumWeight(cacheMaxSize);
}
if (cacheExpire.getNanos() != 0) {
cacheBuilder.setExpireAfterAccess(cacheExpire);
}
logger.debug("using script cache with max_size [{}], expire [{}]", cacheMaxSize, cacheExpire);
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
this.lastInlineCompileTime = System.nanoTime();
this.cacheSize = cacheMaxSize;
this.cacheExpire = cacheExpire;
this.setMaxCompilationRate(maxCompilationRate);
}
private Cache<CacheKey,Object> buildCache() {
CacheBuilder<CacheKey, Object> cacheBuilder = CacheBuilder.builder();
if (cacheSize >= 0) {
cacheBuilder.setMaximumWeight(cacheSize);
}
if (cacheExpire.getNanos() != 0) {
cacheBuilder.setExpireAfterAccess(cacheExpire);
}
return cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
}
<FactoryType> FactoryType compile(
ScriptContext<FactoryType> context,
ScriptEngine scriptEngine,
String id,
String idOrCode,
ScriptType type,
Map<String, String> options
) {
String lang = scriptEngine.getType();
CacheKey cacheKey = new CacheKey(lang, idOrCode, context.name, options);
Object compiledScript = cache.get(cacheKey);
if (compiledScript != null) {
return context.factoryClazz.cast(compiledScript);
}
// Synchronize so we don't compile scripts many times during multiple shards all compiling a script
synchronized (lock) {
// Retrieve it again in case it has been put by a different thread
compiledScript = cache.get(cacheKey);
if (compiledScript == null) {
try {
// Either an un-cached inline script or indexed script
// If the script type is inline the name will be the same as the code for identification in exceptions
// but give the script engine the chance to be better, give it separate name + source code
// for the inline case, then its anonymous: null.
if (logger.isTraceEnabled()) {
logger.trace("context [{}]: compiling script, type: [{}], lang: [{}], options: [{}]", context.name, type,
lang, options);
}
// Check whether too many compilations have happened
checkCompilationLimit();
compiledScript = scriptEngine.compile(id, idOrCode, context, options);
} catch (ScriptException good) {
// TODO: remove this try-catch completely, when all script engines have good exceptions!
throw good; // its already good
} catch (Exception exception) {
throw new GeneralScriptException("Failed to compile " + type + " script [" + id + "] using lang [" + lang + "]",
exception);
}
// Since the cache key is the script content itself we don't need to
// invalidate/check the cache if an indexed script changes.
scriptMetrics.onCompilation();
cache.put(cacheKey, compiledScript);
}
}
return context.factoryClazz.cast(compiledScript);
}
public ScriptStats stats() {
return scriptMetrics.stats();
}
/**
* Check whether there have been too many compilations within the last minute, throwing a circuit breaking exception if so.
* This is a variant of the token bucket algorithm: https://en.wikipedia.org/wiki/Token_bucket
*
* It can be thought of as a bucket with water, every time the bucket is checked, water is added proportional to the amount of time that
* elapsed since the last time it was checked. If there is enough water, some is removed and the request is allowed. If there is not
* enough water the request is denied. Just like a normal bucket, if water is added that overflows the bucket, the extra water/capacity
* is discarded - there can never be more water in the bucket than the size of the bucket.
*/
void checkCompilationLimit() {
if (rate.v1() == 0 && rate.v2().getNanos() == 0) {
// unlimited
return;
}
long now = System.nanoTime();
long timePassed = now - lastInlineCompileTime;
lastInlineCompileTime = now;
scriptsPerTimeWindow += (timePassed) * compilesAllowedPerNano;
// It's been over the time limit anyway, readjust the bucket to be level
if (scriptsPerTimeWindow > rate.v1()) {
scriptsPerTimeWindow = rate.v1();
}
// If there is enough tokens in the bucket, allow the request and decrease the tokens by 1
if (scriptsPerTimeWindow >= 1) {
scriptsPerTimeWindow -= 1.0;
} else {
scriptMetrics.onCompilationLimit();
// Otherwise reject the request
throw new CircuitBreakingException("[script] Too many dynamic script compilations within, max: [" +
rate.v1() + "/" + rate.v2() +"]; please use indexed, or scripts with parameters instead; " +
"this limit can be changed by the [script.max_compilations_rate] setting",
CircuitBreaker.Durability.TRANSIENT);
}
}
/**
* This configures the maximum script compilations per five minute window.
*
* @param newRate the new expected maximum number of compilations per five minute window
*/
void setMaxCompilationRate(Tuple<Integer, TimeValue> newRate) {
synchronized (lock) {
this.rate = newRate;
// Reset the counter to allow new compilations
this.scriptsPerTimeWindow = rate.v1();
this.compilesAllowedPerNano = ((double) rate.v1()) / newRate.v2().nanos();
this.cache = buildCache();
}
}
/**
* A small listener for the script cache that calls each
* {@code ScriptEngine}'s {@code scriptRemoved} method when the
* script has been removed from the cache
*/
private class ScriptCacheRemovalListener implements RemovalListener<CacheKey, Object> {
@Override
public void onRemoval(RemovalNotification<CacheKey, Object> notification) {
if (logger.isDebugEnabled()) {
logger.debug(
"removed [{}] from cache, reason: [{}]",
notification.getValue(),
notification.getRemovalReason()
);
}
scriptMetrics.onCacheEviction();
}
}
private static final class CacheKey {
final String lang;
final String idOrCode;
final String context;
final Map<String, String> options;
private CacheKey(String lang, String idOrCode, String context, Map<String, String> options) {
this.lang = lang;
this.idOrCode = idOrCode;
this.context = context;
this.options = options;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(lang, cacheKey.lang) &&
Objects.equals(idOrCode, cacheKey.idOrCode) &&
Objects.equals(context, cacheKey.context) &&
Objects.equals(options, cacheKey.options);
}
@Override
public int hashCode() {
return Objects.hash(lang, idOrCode, context, options);
}
}
}

View File

@ -66,7 +66,8 @@ public class ScriptModule {
).collect(Collectors.toMap(c -> c.name, Function.identity()));
}
private final ScriptService scriptService;
public final Map<String, ScriptEngine> engines;
public final Map<String, ScriptContext<?>> contexts;
public ScriptModule(Settings settings, List<ScriptPlugin> scriptPlugins) {
Map<String, ScriptEngine> engines = new HashMap<>();
@ -89,20 +90,14 @@ public class ScriptModule {
}
}
}
scriptService = new ScriptService(settings, Collections.unmodifiableMap(engines), Collections.unmodifiableMap(contexts));
}
/**
* Service responsible for managing scripts.
*/
public ScriptService getScriptService() {
return scriptService;
this.engines = Collections.unmodifiableMap(engines);
this.contexts = Collections.unmodifiableMap(contexts);
}
/**
* Allow the script service to register any settings update handlers on the cluster settings
*/
public void registerClusterSettingsListeners(ClusterSettings clusterSettings) {
public void registerClusterSettingsListeners(ScriptService scriptService, ClusterSettings clusterSettings) {
scriptService.registerClusterSettingsListeners(clusterSettings);
}
}

View File

@ -34,12 +34,6 @@ import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -119,22 +113,15 @@ public class ScriptService implements Closeable, ClusterStateApplier {
private final Set<String> typesAllowed;
private final Set<String> contextsAllowed;
final ScriptCache compiler;
private final Map<String, ScriptEngine> engines;
private final Map<String, ScriptContext<?>> contexts;
private final Cache<CacheKey, Object> cache;
private final ScriptMetrics scriptMetrics = new ScriptMetrics();
private ClusterState clusterState;
private int maxSizeInBytes;
private Tuple<Integer, TimeValue> rate;
private long lastInlineCompileTime;
private double scriptsPerTimeWindow;
private double compilesAllowedPerNano;
public ScriptService(Settings settings, Map<String, ScriptEngine> engines, Map<String, ScriptContext<?>> contexts) {
this.settings = Objects.requireNonNull(settings);
this.engines = Objects.requireNonNull(engines);
@ -213,29 +200,26 @@ public class ScriptService implements Closeable, ClusterStateApplier {
}
}
int cacheMaxSize = SCRIPT_CACHE_SIZE_SETTING.get(settings);
CacheBuilder<CacheKey, Object> cacheBuilder = CacheBuilder.builder();
if (cacheMaxSize >= 0) {
cacheBuilder.setMaximumWeight(cacheMaxSize);
}
TimeValue cacheExpire = SCRIPT_CACHE_EXPIRE_SETTING.get(settings);
if (cacheExpire.getNanos() != 0) {
cacheBuilder.setExpireAfterAccess(cacheExpire);
}
logger.debug("using script cache with max_size [{}], expire [{}]", cacheMaxSize, cacheExpire);
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
this.lastInlineCompileTime = System.nanoTime();
this.setMaxSizeInBytes(SCRIPT_MAX_SIZE_IN_BYTES.get(settings));
this.setMaxCompilationRate(SCRIPT_MAX_COMPILATIONS_RATE.get(settings));
compiler = new ScriptCache(
SCRIPT_CACHE_SIZE_SETTING.get(settings),
SCRIPT_CACHE_EXPIRE_SETTING.get(settings),
compilationLimitsEnabled() ?
SCRIPT_MAX_COMPILATIONS_RATE.get(settings):
new Tuple<>(0, TimeValue.ZERO)
);
}
/**
* This is overridden in tests to disable compilation rate limiting.
*/
boolean compilationLimitsEnabled() {
return true;
}
void registerClusterSettingsListeners(ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(SCRIPT_MAX_SIZE_IN_BYTES, this::setMaxSizeInBytes);
clusterSettings.addSettingsUpdateConsumer(SCRIPT_MAX_COMPILATIONS_RATE, this::setMaxCompilationRate);
clusterSettings.addSettingsUpdateConsumer(SCRIPT_MAX_COMPILATIONS_RATE, compiler::setMaxCompilationRate);
}
@Override
@ -267,19 +251,7 @@ public class ScriptService implements Closeable, ClusterStateApplier {
maxSizeInBytes = newMaxSizeInBytes;
}
/**
* This configures the maximum script compilations per five minute window.
*
* @param newRate the new expected maximum number of compilations per five minute window
*/
void setMaxCompilationRate(Tuple<Integer, TimeValue> newRate) {
this.rate = newRate;
// Reset the counter to allow new compilations
this.scriptsPerTimeWindow = rate.v1();
this.compilesAllowedPerNano = ((double) rate.v1()) / newRate.v2().nanos();
}
/**
/*
* Compiles a script using the given context.
*
* @return a compiled script which may be used to construct instances of a script for the given context
@ -332,80 +304,7 @@ public class ScriptService implements Closeable, ClusterStateApplier {
logger.trace("compiling lang: [{}] type: [{}] script: {}", lang, type, idOrCode);
}
CacheKey cacheKey = new CacheKey(lang, idOrCode, context.name, options);
Object compiledScript = cache.get(cacheKey);
if (compiledScript != null) {
return context.factoryClazz.cast(compiledScript);
}
// Synchronize so we don't compile scripts many times during multiple shards all compiling a script
synchronized (this) {
// Retrieve it again in case it has been put by a different thread
compiledScript = cache.get(cacheKey);
if (compiledScript == null) {
try {
// Either an un-cached inline script or indexed script
// If the script type is inline the name will be the same as the code for identification in exceptions
// but give the script engine the chance to be better, give it separate name + source code
// for the inline case, then its anonymous: null.
if (logger.isTraceEnabled()) {
logger.trace("compiling script, type: [{}], lang: [{}], options: [{}]", type, lang, options);
}
// Check whether too many compilations have happened
checkCompilationLimit();
compiledScript = scriptEngine.compile(id, idOrCode, context, options);
} catch (ScriptException good) {
// TODO: remove this try-catch completely, when all script engines have good exceptions!
throw good; // its already good
} catch (Exception exception) {
throw new GeneralScriptException("Failed to compile " + type + " script [" + id + "] using lang [" + lang + "]",
exception);
}
// Since the cache key is the script content itself we don't need to
// invalidate/check the cache if an indexed script changes.
scriptMetrics.onCompilation();
cache.put(cacheKey, compiledScript);
}
return context.factoryClazz.cast(compiledScript);
}
}
/**
* Check whether there have been too many compilations within the last minute, throwing a circuit breaking exception if so.
* This is a variant of the token bucket algorithm: https://en.wikipedia.org/wiki/Token_bucket
*
* It can be thought of as a bucket with water, every time the bucket is checked, water is added proportional to the amount of time that
* elapsed since the last time it was checked. If there is enough water, some is removed and the request is allowed. If there is not
* enough water the request is denied. Just like a normal bucket, if water is added that overflows the bucket, the extra water/capacity
* is discarded - there can never be more water in the bucket than the size of the bucket.
*/
void checkCompilationLimit() {
long now = System.nanoTime();
long timePassed = now - lastInlineCompileTime;
lastInlineCompileTime = now;
scriptsPerTimeWindow += (timePassed) * compilesAllowedPerNano;
// It's been over the time limit anyway, readjust the bucket to be level
if (scriptsPerTimeWindow > rate.v1()) {
scriptsPerTimeWindow = rate.v1();
}
// If there is enough tokens in the bucket, allow the request and decrease the tokens by 1
if (scriptsPerTimeWindow >= 1) {
scriptsPerTimeWindow -= 1.0;
} else {
scriptMetrics.onCompilationLimit();
// Otherwise reject the request
throw new CircuitBreakingException("[script] Too many dynamic script compilations within, max: [" +
rate.v1() + "/" + rate.v2() +"]; please use indexed, or scripts with parameters instead; " +
"this limit can be changed by the [" + SCRIPT_MAX_COMPILATIONS_RATE.getKey() + "] setting",
CircuitBreaker.Durability.TRANSIENT);
}
return compiler.compile(context, scriptEngine, id, idOrCode, type, options);
}
public boolean isLangSupported(String lang) {
@ -569,56 +468,11 @@ public class ScriptService implements Closeable, ClusterStateApplier {
}
public ScriptStats stats() {
return scriptMetrics.stats();
return compiler.stats();
}
@Override
public void applyClusterState(ClusterChangedEvent event) {
clusterState = event.state();
}
/**
* A small listener for the script cache that calls each
* {@code ScriptEngine}'s {@code scriptRemoved} method when the
* script has been removed from the cache
*/
private class ScriptCacheRemovalListener implements RemovalListener<CacheKey, Object> {
@Override
public void onRemoval(RemovalNotification<CacheKey, Object> notification) {
if (logger.isDebugEnabled()) {
logger.debug("removed {} from cache, reason: {}", notification.getValue(), notification.getRemovalReason());
}
scriptMetrics.onCacheEviction();
}
}
private static final class CacheKey {
final String lang;
final String idOrCode;
final String context;
final Map<String, String> options;
private CacheKey(String lang, String idOrCode, String context, Map<String, String> options) {
this.lang = lang;
this.idOrCode = idOrCode;
this.context = context;
this.options = options;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(lang, cacheKey.lang) &&
Objects.equals(idOrCode, cacheKey.idOrCode) &&
Objects.equals(context, cacheKey.context) &&
Objects.equals(options, cacheKey.options);
}
@Override
public int hashCode() {
return Objects.hash(lang, idOrCode, context, options);
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
public class ScriptCacheTests extends ESTestCase {
// even though circuit breaking is allowed to be configured per minute, we actually weigh this over five minutes
// simply by multiplying by five, so even setting it to one, requires five compilations to break
public void testCompilationCircuitBreaking() throws Exception {
ScriptCache cache = new ScriptCache(
ScriptService.SCRIPT_CACHE_SIZE_SETTING.get(Settings.EMPTY),
ScriptService.SCRIPT_CACHE_EXPIRE_SETTING.get(Settings.EMPTY),
ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.get(Settings.EMPTY)
);
cache.setMaxCompilationRate(Tuple.tuple(1, TimeValue.timeValueMinutes(1)));
cache.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, () -> cache.checkCompilationLimit());
cache.setMaxCompilationRate(Tuple.tuple(2, TimeValue.timeValueMinutes(1)));
cache.checkCompilationLimit(); // should pass
cache.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, () -> cache.checkCompilationLimit());
int count = randomIntBetween(5, 50);
cache.setMaxCompilationRate(Tuple.tuple(count, TimeValue.timeValueMinutes(1)));
for (int i = 0; i < count; i++) {
cache.checkCompilationLimit(); // should pass
}
expectThrows(CircuitBreakingException.class, () -> cache.checkCompilationLimit());
cache.setMaxCompilationRate(Tuple.tuple(0, TimeValue.timeValueMinutes(1)));
expectThrows(CircuitBreakingException.class, () -> cache.checkCompilationLimit());
cache.setMaxCompilationRate(Tuple.tuple(Integer.MAX_VALUE, TimeValue.timeValueMinutes(1)));
int largeLimit = randomIntBetween(1000, 10000);
for (int i = 0; i < largeLimit; i++) {
cache.checkCompilationLimit();
}
}
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptReque
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -61,7 +60,6 @@ public class ScriptServiceTests extends ESTestCase {
public void setup() throws IOException {
baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "10000/1m")
.build();
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
for (int i = 0; i < 20; ++i) {
@ -98,32 +96,6 @@ public class ScriptServiceTests extends ESTestCase {
scriptService.registerClusterSettingsListeners(clusterSettings);
}
// even though circuit breaking is allowed to be configured per minute, we actually weigh this over five minutes
// simply by multiplying by five, so even setting it to one, requires five compilations to break
public void testCompilationCircuitBreaking() throws Exception {
buildScriptService(Settings.EMPTY);
scriptService.setMaxCompilationRate(Tuple.tuple(1, TimeValue.timeValueMinutes(1)));
scriptService.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, () -> scriptService.checkCompilationLimit());
scriptService.setMaxCompilationRate(Tuple.tuple(2, TimeValue.timeValueMinutes(1)));
scriptService.checkCompilationLimit(); // should pass
scriptService.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, () -> scriptService.checkCompilationLimit());
int count = randomIntBetween(5, 50);
scriptService.setMaxCompilationRate(Tuple.tuple(count, TimeValue.timeValueMinutes(1)));
for (int i = 0; i < count; i++) {
scriptService.checkCompilationLimit(); // should pass
}
expectThrows(CircuitBreakingException.class, () -> scriptService.checkCompilationLimit());
scriptService.setMaxCompilationRate(Tuple.tuple(0, TimeValue.timeValueMinutes(1)));
expectThrows(CircuitBreakingException.class, () -> scriptService.checkCompilationLimit());
scriptService.setMaxCompilationRate(Tuple.tuple(Integer.MAX_VALUE, TimeValue.timeValueMinutes(1)));
int largeLimit = randomIntBetween(1000, 10000);
for (int i = 0; i < largeLimit; i++) {
scriptService.checkCompilationLimit();
}
}
public void testMaxCompilationRateSetting() throws Exception {
assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("10/1m"), is(Tuple.tuple(10, TimeValue.timeValueMinutes(1))));
assertThat(MAX_COMPILATION_RATE_FUNCTION.apply("10/60s"), is(Tuple.tuple(10, TimeValue.timeValueMinutes(1))));

View File

@ -38,6 +38,9 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptService;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchService;
@ -52,6 +55,7 @@ import org.elasticsearch.transport.TransportService;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@ -133,6 +137,14 @@ public class MockNode extends Node {
bigArrays, fetchPhase, circuitBreakerService);
}
@Override
protected ScriptService newScriptService(Settings settings, Map<String, ScriptEngine> engines, Map<String, ScriptContext<?>> contexts) {
if (getPluginsService().filterPlugins(MockScriptService.TestPlugin.class).isEmpty()) {
return super.newScriptService(settings, engines, contexts);
}
return new MockScriptService(settings, engines, contexts);
}
@Override
protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor,

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.script;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.plugins.Plugin;
import java.util.Map;
public class MockScriptService extends ScriptService {
/**
* Marker plugin used by {@link MockNode} to enable {@link MockScriptService}.
*/
public static class TestPlugin extends Plugin {}
public MockScriptService(Settings settings, Map<String, ScriptEngine> engines, Map<String, ScriptContext<?>> contexts) {
super(settings, engines, contexts);
}
@Override
boolean compilationLimitsEnabled() {
return false;
}
}

View File

@ -69,6 +69,7 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptService;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptModule;
@ -353,7 +354,7 @@ public abstract class AbstractBuilderTestCase extends ESTestCase {
idxSettings = IndexSettingsModule.newIndexSettings(index, indexSettings, indexScopedSettings);
AnalysisModule analysisModule = new AnalysisModule(TestEnvironment.newEnvironment(nodeSettings), emptyList());
IndexAnalyzers indexAnalyzers = analysisModule.getAnalysisRegistry().build(idxSettings);
scriptService = scriptModule.getScriptService();
scriptService = new MockScriptService(Settings.EMPTY, scriptModule.engines, scriptModule.contexts);
similarityService = new SimilarityService(idxSettings, null, Collections.emptyMap());
MapperRegistry mapperRegistry = indicesModule.getMapperRegistry();
mapperService = new MapperService(idxSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,

View File

@ -126,8 +126,8 @@ import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.script.MockScriptService;
import org.elasticsearch.script.ScriptMetaData;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchService;
@ -1719,7 +1719,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b")
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "2048/1m")
// by default we never cache below 10k docs in a segment,
// bypass this limit so that caching gets some testing in
// integration tests that usually create few documents
@ -1941,6 +1940,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
mocks.add(TestSeedPlugin.class);
mocks.add(AssertActionNamePlugin.class);
mocks.add(MockScriptService.TestPlugin.class);
return Collections.unmodifiableList(mocks);
}

View File

@ -50,7 +50,7 @@ import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.MockScriptService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportSettings;
import org.junit.AfterClass;
@ -204,7 +204,6 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
// This needs to tie into the ESIntegTestCase#indexSettings() method
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
.put("transport.type", getTestTransportType())
.put(TransportSettings.PORT.getKey(), ESTestCase.getPortRange())
@ -230,6 +229,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
if (addMockHttpTransport()) {
plugins.add(MockHttpTransport.TestPlugin.class);
}
plugins.add(MockScriptService.TestPlugin.class);
Node node = new MockNode(settings, plugins, forbidPrivateIndexSettings());
try {
node.start();

View File

@ -379,8 +379,6 @@ public final class InternalTestCluster extends TestCluster {
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b");
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b");
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b");
// Some tests make use of scripting quite a bit, so increase the limit for integration tests
builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m");
builder.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), random.nextBoolean());
if (TEST_NIGHTLY) {
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(),