Merge branch 'master' into rest_handler_client

This commit is contained in:
Ryan Ernst 2016-06-30 08:16:25 -07:00
commit c762e7aa15
219 changed files with 2677 additions and 3269 deletions

View File

@ -1168,7 +1168,6 @@
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]TransportMultiPercolateAction.java" checks="LineLength" />
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]TransportPercolateAction.java" checks="LineLength" />
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]TransportShardMultiPercolateAction.java" checks="LineLength" />
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]RestPercolateAction.java" checks="LineLength" />
<suppress files="modules[/\\]percolator[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]MultiPercolatorIT.java" checks="LineLength" />
<suppress files="modules[/\\]percolator[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]PercolatorIT.java" checks="LineLength" />
<suppress files="modules[/\\]percolator[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]MultiPercolatorRequestTests.java" checks="LineLength" />
@ -1180,10 +1179,10 @@
<suppress files="plugins[/\\]analysis-kuromoji[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]JapaneseStopTokenFilterFactory.java" checks="LineLength" />
<suppress files="plugins[/\\]analysis-kuromoji[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]KuromojiAnalysisTests.java" checks="LineLength" />
<suppress files="plugins[/\\]analysis-phonetic[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]PhoneticTokenFilterFactory.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]azure[/\\]AbstractAzureTestCase.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureMinimumMasterNodesTests.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureSimpleTests.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureTwoStartedNodesTests.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]azure[/\\]AbstractAzureTestCase.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureMinimumMasterNodesTests.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureSimpleTests.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureTwoStartedNodesTests.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-ec2[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]aws[/\\]AbstractAwsTestCase.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-ec2[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ec2[/\\]AmazonEC2Mock.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-gce[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]gce[/\\]GceNetworkTests.java" checks="LineLength" />

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
import java.util.ArrayList;
@ -496,7 +497,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.index.shard.IndexShardStartedException::new, 23),
SEARCH_CONTEXT_MISSING_EXCEPTION(org.elasticsearch.search.SearchContextMissingException.class,
org.elasticsearch.search.SearchContextMissingException::new, 24),
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
org.elasticsearch.script.GeneralScriptException::new, 25),
BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class,
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26),
@ -676,8 +677,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.indices.IndexAlreadyExistsException::new, 123),
SCRIPT_PARSE_EXCEPTION(org.elasticsearch.script.Script.ScriptParseException.class,
org.elasticsearch.script.Script.ScriptParseException::new, 124),
HTTP_ON_TRANSPORT_EXCEPTION(org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class,
org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException::new, 125),
HTTP_ON_TRANSPORT_EXCEPTION(TcpTransport.HttpOnTransportException.class,
TcpTransport.HttpOnTransportException::new, 125),
MAPPER_PARSING_EXCEPTION(org.elasticsearch.index.mapper.MapperParsingException.class,
org.elasticsearch.index.mapper.MapperParsingException::new, 126),
SEARCH_CONTEXT_EXCEPTION(org.elasticsearch.search.SearchContextException.class,

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;
@ -91,7 +91,7 @@ public class SimulateProcessorResult implements Writeable, ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (processorTag != null) {
builder.field(AbstractProcessorFactory.TAG_KEY, processorTag);
builder.field(ConfigurationUtils.TAG_KEY, processorTag);
}
if (failure == null) {
ingestDocument.toXContent(builder, params);

View File

@ -251,8 +251,7 @@ public class UpdateHelper extends AbstractComponent {
private Map<String, Object> executeScript(Script script, Map<String, Object> ctx) {
try {
if (scriptService != null) {
ClusterState state = clusterService.state();
ExecutableScript executableScript = scriptService.executable(script, ScriptContext.Standard.UPDATE, Collections.emptyMap(), state);
ExecutableScript executableScript = scriptService.executable(script, ScriptContext.Standard.UPDATE, Collections.emptyMap());
executableScript.setNextVar("ctx", ctx);
executableScript.run();
// we need to unwrap the ctx...

View File

@ -50,8 +50,8 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
import java.io.Closeable;
import java.util.ArrayList;
@ -107,7 +107,7 @@ public class TransportClient extends AbstractClient {
private PluginsService newPluginService(final Settings settings) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(NettyTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
.put(InternalSettingsPreparer.prepareSettings(settings))
.put(NetworkService.NETWORK_SERVER.getKey(), false)
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);

View File

@ -20,7 +20,6 @@
package org.elasticsearch.common.io;
import org.elasticsearch.common.SuppressForbidden;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.EOFException;
import java.io.IOException;
@ -159,25 +158,6 @@ public final class Channels {
return bytesRead;
}
/**
* Copies bytes from source {@link org.jboss.netty.buffer.ChannelBuffer} to a {@link java.nio.channels.GatheringByteChannel}
*
* @param source ChannelBuffer to copy from
* @param sourceIndex index in <i>source</i> to start copying from
* @param length how many bytes to copy
* @param channel target GatheringByteChannel
*/
public static void writeToChannel(ChannelBuffer source, int sourceIndex, int length, GatheringByteChannel channel) throws IOException {
while (length > 0) {
int written = source.getBytes(sourceIndex, channel, length);
sourceIndex += written;
length -= written;
}
assert length == 0;
}
/**
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
*

View File

@ -1,37 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.netty;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
* A marker to not remove frame decoder from the resulting jar so plugins can use it.
*/
public class KeepFrameDecoder extends FrameDecoder {
public static final KeepFrameDecoder decoder = new KeepFrameDecoder();
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
return null;
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.netty;
import org.elasticsearch.common.lease.Releasable;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
/**
* A channel listener that releases a {@link org.elasticsearch.common.lease.Releasable} when
* the operation is complete.
*/
public class ReleaseChannelFutureListener implements ChannelFutureListener {
private final Releasable releasable;
public ReleaseChannelFutureListener(Releasable releasable) {
this.releasable = releasable;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
releasable.close();
}
}

View File

@ -222,6 +222,11 @@ public abstract class TimeZoneRounding extends Rounding {
long roundedUTC;
if (isInDSTGap(rounded) == false) {
roundedUTC = timeZone.convertLocalToUTC(rounded, true, utcMillis);
// check if we crossed DST transition, in this case we want the last rounded value before the transition
long transition = timeZone.previousTransition(utcMillis);
if (transition != utcMillis && transition > roundedUTC) {
roundedUTC = roundKey(transition - 1);
}
} else {
/*
* Edge case where the rounded local time is illegal and landed

View File

@ -89,6 +89,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
@ -279,14 +280,14 @@ public final class ClusterSettings extends AbstractScopedSettings {
TransportSettings.PUBLISH_PORT,
TransportSettings.PORT,
NettyTransport.WORKER_COUNT,
NettyTransport.CONNECTIONS_PER_NODE_RECOVERY,
NettyTransport.CONNECTIONS_PER_NODE_BULK,
NettyTransport.CONNECTIONS_PER_NODE_REG,
NettyTransport.CONNECTIONS_PER_NODE_STATE,
NettyTransport.CONNECTIONS_PER_NODE_PING,
NettyTransport.PING_SCHEDULE,
NettyTransport.TCP_BLOCKING_CLIENT,
NettyTransport.TCP_CONNECT_TIMEOUT,
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
TcpTransport.CONNECTIONS_PER_NODE_BULK,
TcpTransport.CONNECTIONS_PER_NODE_REG,
TcpTransport.CONNECTIONS_PER_NODE_STATE,
TcpTransport.CONNECTIONS_PER_NODE_PING,
TcpTransport.PING_SCHEDULE,
TcpTransport.TCP_BLOCKING_CLIENT,
TcpTransport.TCP_CONNECT_TIMEOUT,
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
@ -294,12 +295,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
NetworkService.NETWORK_SERVER,
NettyTransport.NETTY_BOSS_COUNT,
NettyTransport.TCP_NO_DELAY,
NettyTransport.TCP_KEEP_ALIVE,
NettyTransport.TCP_REUSE_ADDRESS,
NettyTransport.TCP_SEND_BUFFER_SIZE,
NettyTransport.TCP_RECEIVE_BUFFER_SIZE,
NettyTransport.TCP_BLOCKING_SERVER,
TcpTransport.TCP_NO_DELAY,
TcpTransport.TCP_KEEP_ALIVE,
TcpTransport.TCP_REUSE_ADDRESS,
TcpTransport.TCP_SEND_BUFFER_SIZE,
TcpTransport.TCP_RECEIVE_BUFFER_SIZE,
TcpTransport.TCP_BLOCKING_SERVER,
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,

View File

@ -19,7 +19,7 @@
package org.elasticsearch.http.netty;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.transport.netty.NettyUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.CompositeChannelBuffer;

View File

@ -24,8 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.transport.netty.NettyUtils;
import org.elasticsearch.http.netty.cors.CorsHandler;
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
@ -128,7 +127,7 @@ public final class NettyHttpChannel extends AbstractRestChannel {
}
if (content instanceof Releasable) {
future.addListener(new ReleaseChannelFutureListener((Releasable) content));
future.addListener((x) -> ((Releasable)content).close());
addedReleaseListener = true;
}

View File

@ -21,7 +21,7 @@ package org.elasticsearch.http.netty;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.transport.netty.NettyUtils;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.channel.Channel;

View File

@ -24,8 +24,8 @@ import com.carrotsearch.hppc.IntSet;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.transport.netty.NettyUtils;
import org.elasticsearch.transport.netty.OpenChannelsHandler;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;

View File

@ -504,7 +504,7 @@ public final class InnerHitBuilder extends ToXContentToBytes implements Writeabl
if (scriptFields != null) {
for (ScriptField field : scriptFields) {
SearchScript searchScript = innerHitsContext.scriptService().search(innerHitsContext.lookup(), field.script(),
ScriptContext.Standard.SEARCH, Collections.emptyMap(), context.getClusterState());
ScriptContext.Standard.SEARCH, Collections.emptyMap());
innerHitsContext.scriptFields().add(new org.elasticsearch.search.fetch.script.ScriptFieldsContext.ScriptField(
field.fieldName(), searchScript, field.ignoreFailure()));
}

View File

@ -157,7 +157,7 @@ public class ScriptQueryBuilder extends AbstractQueryBuilder<ScriptQueryBuilder>
@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
return new ScriptQuery(script, context.getScriptService(), context.lookup(), context.getClusterState());
return new ScriptQuery(script, context.getScriptService(), context.lookup());
}
static class ScriptQuery extends Query {
@ -166,9 +166,9 @@ public class ScriptQueryBuilder extends AbstractQueryBuilder<ScriptQueryBuilder>
private final SearchScript searchScript;
public ScriptQuery(Script script, ScriptService scriptService, SearchLookup searchLookup, ClusterState state) {
public ScriptQuery(Script script, ScriptService scriptService, SearchLookup searchLookup) {
this.script = script;
this.searchScript = scriptService.search(searchLookup, script, ScriptContext.Standard.SEARCH, Collections.emptyMap(), state);
this.searchScript = scriptService.search(searchLookup, script, ScriptContext.Standard.SEARCH, Collections.emptyMap());
}
@Override

View File

@ -177,7 +177,7 @@ public class TemplateQueryBuilder extends AbstractQueryBuilder<TemplateQueryBuil
@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
ExecutableScript executable = queryRewriteContext.getScriptService().executable(template,
ScriptContext.Standard.SEARCH, Collections.emptyMap(), queryRewriteContext.getClusterState());
ScriptContext.Standard.SEARCH, Collections.emptyMap());
BytesReference querySource = (BytesReference) executable.run();
try (XContentParser qSourceParser = XContentFactory.xContent(querySource).createParser(querySource)) {
final QueryParseContext queryParseContext = queryRewriteContext.newParseContext(qSourceParser);

View File

@ -103,7 +103,7 @@ public class ScriptScoreFunctionBuilder extends ScoreFunctionBuilder<ScriptScore
protected ScoreFunction doToFunction(QueryShardContext context) {
try {
SearchScript searchScript = context.getScriptService().search(context.lookup(), script, ScriptContext.Standard.SEARCH,
Collections.emptyMap(), context.getClusterState());
Collections.emptyMap());
return new ScriptScoreFunction(script, searchScript);
} catch (Exception e) {
throw new QueryShardException(context, "script_score: the script could not be loaded", e);

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.util.Map;
/**
* A processor implementation may modify the data belonging to a document.
* Whether changes are made and what exactly is modified is up to the implementation.
*/
public abstract class AbstractProcessorFactory<P extends Processor> implements Processor.Factory<P> {
public static final String TAG_KEY = "tag";
@Override
public P create(Map<String, Object> config) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
return doCreate(tag, config);
}
protected abstract P doCreate(String tag, Map<String, Object> config) throws Exception;
}

View File

@ -29,6 +29,8 @@ import java.util.Map;
public final class ConfigurationUtils {
public static final String TAG_KEY = "tag";
private ConfigurationUtils() {
}
@ -255,8 +257,8 @@ public final class ConfigurationUtils {
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
Processor processor;
processor = factory.create(config);
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
Processor processor = factory.create(tag, config);
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(processor.getType(), processor.getTag(), Pipeline.ON_FAILURE_KEY,

View File

@ -46,8 +46,7 @@ public class InternalTemplateService implements TemplateService {
CompiledScript compiledScript = scriptService.compile(
script,
ScriptContext.Standard.INGEST,
Collections.emptyMap(),
null); // null == OK, because ingest templates are only inline templates.
Collections.emptyMap());
return new Template() {
@Override
public String execute(Map<String, Object> model) {

View File

@ -45,14 +45,17 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
interface Factory<P extends Processor> {
interface Factory {
/**
* Creates a processor based on the specified map of maps config.
*
* @param tag The tag for the processor
* @param config Configuration for the processor to create
*
* Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can
* verify if all configurations settings have been used.
*/
P create(Map<String, Object> config) throws Exception;
Processor create(String tag, Map<String, Object> config) throws Exception;
}
}

View File

@ -37,12 +37,12 @@ public final class ProcessorsRegistry {
private final ClusterService clusterService;
private ProcessorsRegistry(ScriptService scriptService, ClusterService clusterService,
Map<String, Function<ProcessorsRegistry, Processor.Factory<?>>> providers) {
Map<String, Function<ProcessorsRegistry, Processor.Factory>> providers) {
this.templateService = new InternalTemplateService(scriptService);
this.scriptService = scriptService;
this.clusterService = clusterService;
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (Map.Entry<String, Function<ProcessorsRegistry, Processor.Factory<?>>> entry : providers.entrySet()) {
for (Map.Entry<String, Function<ProcessorsRegistry, Processor.Factory>> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(this));
}
this.processorFactories = Collections.unmodifiableMap(processorFactories);
@ -71,13 +71,13 @@ public final class ProcessorsRegistry {
public static final class Builder {
private final Map<String, Function<ProcessorsRegistry, Processor.Factory<?>>> providers = new HashMap<>();
private final Map<String, Function<ProcessorsRegistry, Processor.Factory>> providers = new HashMap<>();
/**
* Adds a processor factory under a specific name.
*/
public void registerProcessor(String name, Function<ProcessorsRegistry, Processor.Factory<?>> provider) {
Function<ProcessorsRegistry, Processor.Factory<?>> previous = this.providers.putIfAbsent(name, provider);
public void registerProcessor(String name, Function<ProcessorsRegistry, Processor.Factory> provider) {
Function<ProcessorsRegistry, Processor.Factory> previous = this.providers.putIfAbsent(name, provider);
if (previous != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
}

View File

@ -250,6 +250,7 @@ public class Node implements Closeable {
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(settings);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.add(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final TribeService tribeService = new TribeService(settings, clusterService);
resourcesToClose.add(tribeService);

View File

@ -62,7 +62,7 @@ public class NodeModule extends AbstractModule {
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, Function<ProcessorsRegistry, Processor.Factory<?>> provider) {
public void registerProcessor(String type, Function<ProcessorsRegistry, Processor.Factory> provider) {
processorsRegistryBuilder.registerProcessor(type, provider);
}
}

View File

@ -28,7 +28,9 @@ import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptReque
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -75,7 +77,7 @@ import java.util.concurrent.ConcurrentMap;
import static java.util.Collections.unmodifiableMap;
public class ScriptService extends AbstractComponent implements Closeable {
public class ScriptService extends AbstractComponent implements Closeable, ClusterStateListener {
static final String DISABLE_DYNAMIC_SCRIPTING_SETTING = "script.disable_dynamic";
@ -105,6 +107,8 @@ public class ScriptService extends AbstractComponent implements Closeable {
private final ParseFieldMatcher parseFieldMatcher;
private final ScriptMetrics scriptMetrics = new ScriptMetrics();
private ClusterState clusterState;
/**
* @deprecated Use {@link org.elasticsearch.script.Script.ScriptField} instead. This should be removed in
* 2.0
@ -217,7 +221,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
/**
* Checks if a script can be executed and compiles it if needed, or returns the previously compiled and cached script.
*/
public CompiledScript compile(Script script, ScriptContext scriptContext, Map<String, String> params, ClusterState state) {
public CompiledScript compile(Script script, ScriptContext scriptContext, Map<String, String> params) {
if (script == null) {
throw new IllegalArgumentException("The parameter script (Script) must not be null.");
}
@ -245,14 +249,14 @@ public class ScriptService extends AbstractComponent implements Closeable {
" operation [" + scriptContext.getKey() + "] and lang [" + lang + "] are not supported");
}
return compileInternal(script, params, state);
return compileInternal(script, params);
}
/**
* Compiles a script straight-away, or returns the previously compiled and cached script,
* without checking if it can be executed based on settings.
*/
CompiledScript compileInternal(Script script, Map<String, String> params, ClusterState state) {
CompiledScript compileInternal(Script script, Map<String, String> params) {
if (script == null) {
throw new IllegalArgumentException("The parameter script (Script) must not be null.");
}
@ -289,7 +293,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
//the script has been updated in the index since the last look up.
final IndexedScript indexedScript = new IndexedScript(lang, name);
name = indexedScript.id;
code = getScriptFromClusterState(state, indexedScript.lang, indexedScript.id);
code = getScriptFromClusterState(indexedScript.lang, indexedScript.id);
}
CacheKey cacheKey = new CacheKey(scriptEngineService, type == ScriptType.INLINE ? null : name, code, params);
@ -328,9 +332,9 @@ public class ScriptService extends AbstractComponent implements Closeable {
return scriptLang;
}
String getScriptFromClusterState(ClusterState state, String scriptLang, String id) {
String getScriptFromClusterState(String scriptLang, String id) {
scriptLang = validateScriptLanguage(scriptLang);
ScriptMetaData scriptMetadata = state.metaData().custom(ScriptMetaData.TYPE);
ScriptMetaData scriptMetadata = clusterState.metaData().custom(ScriptMetaData.TYPE);
if (scriptMetadata == null) {
throw new ResourceNotFoundException("Unable to find script [" + scriptLang + "/" + id + "] in cluster state");
}
@ -443,8 +447,8 @@ public class ScriptService extends AbstractComponent implements Closeable {
/**
* Compiles (or retrieves from cache) and executes the provided script
*/
public ExecutableScript executable(Script script, ScriptContext scriptContext, Map<String, String> params, ClusterState state) {
return executable(compile(script, scriptContext, params, state), script.getParams());
public ExecutableScript executable(Script script, ScriptContext scriptContext, Map<String, String> params) {
return executable(compile(script, scriptContext, params), script.getParams());
}
/**
@ -457,8 +461,8 @@ public class ScriptService extends AbstractComponent implements Closeable {
/**
* Compiles (or retrieves from cache) and executes the provided search script
*/
public SearchScript search(SearchLookup lookup, Script script, ScriptContext scriptContext, Map<String, String> params, ClusterState state) {
CompiledScript compiledScript = compile(script, scriptContext, params, state);
public SearchScript search(SearchLookup lookup, Script script, ScriptContext scriptContext, Map<String, String> params) {
CompiledScript compiledScript = compile(script, scriptContext, params);
return getScriptEngineServiceForLang(compiledScript.lang()).search(compiledScript, lookup, script.getParams());
}
@ -496,6 +500,11 @@ public class ScriptService extends AbstractComponent implements Closeable {
}
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
clusterState = event.state();
}
/**
* A small listener for the script cache that calls each
* {@code ScriptEngineService}'s {@code scriptRemoved} method when the

View File

@ -740,7 +740,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
if (source.scriptFields() != null) {
for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
SearchScript searchScript = context.scriptService().search(context.lookup(), field.script(), ScriptContext.Standard.SEARCH,
Collections.emptyMap(), context.getQueryShardContext().getClusterState());
Collections.emptyMap());
context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure()));
}
}

View File

@ -79,16 +79,16 @@ public class ScriptHeuristic extends SignificanceHeuristic {
@Override
public void initialize(InternalAggregation.ReduceContext context) {
initialize(context.scriptService(), context.clusterState());
initialize(context.scriptService());
}
@Override
public void initialize(SearchContext context) {
initialize(context.scriptService(), context.getQueryShardContext().getClusterState());
initialize(context.scriptService());
}
public void initialize(ScriptService scriptService, ClusterState state) {
searchScript = scriptService.executable(script, ScriptContext.Standard.AGGS, Collections.emptyMap(), state);
public void initialize(ScriptService scriptService) {
searchScript = scriptService.executable(script, ScriptContext.Standard.AGGS, Collections.emptyMap());
searchScript.setNextVar("_subset_freq", subsetDfHolder);
searchScript.setNextVar("_subset_size", subsetSizeHolder);
searchScript.setNextVar("_superset_freq", supersetDfHolder);

View File

@ -92,7 +92,7 @@ public class InternalScriptedMetric extends InternalMetricsAggregation implement
vars.putAll(firstAggregation.reduceScript.getParams());
}
CompiledScript compiledScript = reduceContext.scriptService().compile(firstAggregation.reduceScript,
ScriptContext.Standard.AGGS, Collections.emptyMap(), reduceContext.clusterState());
ScriptContext.Standard.AGGS, Collections.emptyMap());
ExecutableScript script = reduceContext.scriptService().executable(compiledScript, vars);
aggregation = script.run();
} else {

View File

@ -54,11 +54,11 @@ public class ScriptedMetricAggregator extends MetricsAggregator {
ScriptService scriptService = context.searchContext().scriptService();
ClusterState state = context.searchContext().getQueryShardContext().getClusterState();
if (initScript != null) {
scriptService.executable(initScript, ScriptContext.Standard.AGGS, Collections.emptyMap(), state).run();
scriptService.executable(initScript, ScriptContext.Standard.AGGS, Collections.emptyMap()).run();
}
this.mapScript = scriptService.search(context.searchContext().lookup(), mapScript, ScriptContext.Standard.AGGS, Collections.emptyMap(), state);
this.mapScript = scriptService.search(context.searchContext().lookup(), mapScript, ScriptContext.Standard.AGGS, Collections.emptyMap());
if (combineScript != null) {
this.combineScript = scriptService.executable(combineScript, ScriptContext.Standard.AGGS, Collections.emptyMap(), state);
this.combineScript = scriptService.executable(combineScript, ScriptContext.Standard.AGGS, Collections.emptyMap());
} else {
this.combineScript = null;
}

View File

@ -106,7 +106,7 @@ public class TopHitsAggregatorFactory extends AggregatorFactory<TopHitsAggregato
if (scriptFields != null) {
for (ScriptField field : scriptFields) {
SearchScript searchScript = subSearchContext.scriptService().search(subSearchContext.lookup(), field.script(),
ScriptContext.Standard.SEARCH, Collections.emptyMap(), subSearchContext.getQueryShardContext().getClusterState());
ScriptContext.Standard.SEARCH, Collections.emptyMap());
subSearchContext.scriptFields().add(new org.elasticsearch.search.fetch.script.ScriptFieldsContext.ScriptField(
field.fieldName(), searchScript, field.ignoreFailure()));
}

View File

@ -93,7 +93,7 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
List<? extends Bucket> buckets = originalAgg.getBuckets();
CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS,
Collections.emptyMap(), reduceContext.clusterState());
Collections.emptyMap());
List newBuckets = new ArrayList<>();
for (Bucket bucket : buckets) {
Map<String, Object> vars = new HashMap<>();

View File

@ -90,7 +90,7 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator {
List<? extends Bucket> buckets = originalAgg.getBuckets();
CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS,
Collections.emptyMap(), reduceContext.clusterState());
Collections.emptyMap());
List newBuckets = new ArrayList<>();
for (Bucket bucket : buckets) {
Map<String, Object> vars = new HashMap<>();

View File

@ -377,8 +377,7 @@ public abstract class ValuesSourceAggregationBuilder<VS extends ValuesSource, AB
private SearchScript createScript(Script script, SearchContext context) {
return script == null ? null
: context.scriptService().search(context.lookup(), script, ScriptContext.Standard.AGGS, Collections.emptyMap(),
context.getQueryShardContext().getClusterState());
: context.scriptService().search(context.lookup(), script, ScriptContext.Standard.AGGS, Collections.emptyMap());
}
private static DocValueFormat resolveFormat(@Nullable String format, @Nullable ValueType valueType) {

View File

@ -304,7 +304,7 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
@Override
public SortFieldAndFormat build(QueryShardContext context) throws IOException {
final SearchScript searchScript = context.getScriptService().search(
context.lookup(), script, ScriptContext.Standard.SEARCH, Collections.emptyMap(), context.getClusterState());
context.lookup(), script, ScriptContext.Standard.SEARCH, Collections.emptyMap());
MultiValueMode valueMode = null;
if (sortMode != null) {

View File

@ -631,7 +631,7 @@ public class PhraseSuggestionBuilder extends SuggestionBuilder<PhraseSuggestionB
if (this.collateQuery != null) {
CompiledScript compiledScript = context.getScriptService().compile(this.collateQuery, ScriptContext.Standard.SEARCH,
Collections.emptyMap(), context.getClusterState());
Collections.emptyMap());
suggestionContext.setCollateQueryScript(compiledScript);
if (this.collateParams != null) {
suggestionContext.setCollateScriptParams(this.collateParams);

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class TcpHeader {
public static final int MARKER_BYTES_SIZE = 2 * 1;
public static final int MESSAGE_LENGTH_SIZE = 4;
public static final int REQUEST_ID_SIZE = 8;
public static final int STATUS_SIZE = 1;
public static final int VERSION_ID_SIZE = 4;
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int messageSize) throws IOException {
output.writeByte((byte)'E');
output.writeByte((byte)'S');
// write the size, the size indicates the remaining message size, not including the size int
output.writeInt(messageSize - TcpHeader.MARKER_BYTES_SIZE - TcpHeader.MESSAGE_LENGTH_SIZE);
output.writeLong(requestId);
output.writeByte(status);
output.writeInt(version.id);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
public final class TcpTransportChannel<Channel> implements TransportChannel {
private final TcpTransport<Channel> transport;
protected final Version version;
protected final String action;
protected final long requestId;
private final String profileName;
private final long reservedBytes;
private final AtomicBoolean released = new AtomicBoolean();
private final String channelType;
private final Channel channel;
public TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
long requestId, Version version, String profileName, long reservedBytes) {
this.version = version;
this.channel = channel;
this.transport = transport;
this.action = action;
this.requestId = requestId;
this.profileName = profileName;
this.reservedBytes = reservedBytes;
this.channelType = channelType;
}
@Override
public final String getProfileName() {
return profileName;
}
@Override
public final String action() {
return this.action;
}
@Override
public final void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
}
@Override
public final void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
release();
transport.sendResponse(version, channel, response, requestId, action, options);
}
@Override
public void sendResponse(Throwable error) throws IOException {
release();
transport.sendErrorResponse(version, channel, error, requestId, action);
}
private void release() {
// attempt to release once atomically
if (released.compareAndSet(false, true) == false) {
throw new IllegalStateException("reserved bytes are already released");
}
transport.getInFlightRequestBreaker().addWithoutBreaking(-reservedBytes);
}
@Override
public final long getRequestId() {
return requestId;
}
@Override
public final String getChannelType() {
return channelType;
}
public Channel getChannel() {
return channel;
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -94,4 +96,9 @@ public interface Transport extends LifecycleComponent<Transport> {
long serverOpen();
List<String> getLocalAddresses();
default CircuitBreaker getInFlightRequestBreaker() {
return new NoopCircuitBreaker("in-flight-noop");
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.transport;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.transport.netty.NettyTransport;
import java.util.Arrays;
@ -39,10 +38,10 @@ public enum Transports {
final String threadName = t.getName();
for (String s : Arrays.asList(
LocalTransport.LOCAL_TRANSPORT_THREAD_NAME_PREFIX,
NettyTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
NettyTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
NettyTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
NettyTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
TcpTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
TcpTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
if (threadName.contains(s)) {
return true;

View File

@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.netty;
package org.elasticsearch.transport.netty;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
@ -34,9 +33,12 @@ import java.nio.charset.StandardCharsets;
final class ChannelBufferBytesReference implements BytesReference {
private final ChannelBuffer buffer;
private final int size;
ChannelBufferBytesReference(ChannelBuffer buffer) {
ChannelBufferBytesReference(ChannelBuffer buffer, int size) {
this.buffer = buffer;
this.size = size;
assert size <= buffer.readableBytes() : "size[" + size +"] > " + buffer.readableBytes();
}
@Override
@ -46,25 +48,24 @@ final class ChannelBufferBytesReference implements BytesReference {
@Override
public int length() {
return buffer.readableBytes();
return size;
}
@Override
public BytesReference slice(int from, int length) {
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length));
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length), length);
}
@Override
public StreamInput streamInput() {
return ChannelBufferStreamInputFactory.create(buffer.duplicate());
return new ChannelBufferStreamInput(buffer.duplicate(), size);
}
@Override
public void writeTo(OutputStream os) throws IOException {
buffer.getBytes(buffer.readerIndex(), os, length());
buffer.getBytes(buffer.readerIndex(), os, size);
}
@Override
public byte[] toBytes() {
return copyBytesArray().toBytes();
}
@ -72,7 +73,7 @@ final class ChannelBufferBytesReference implements BytesReference {
@Override
public BytesArray toBytesArray() {
if (buffer.hasArray()) {
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
}
return copyBytesArray();
}
@ -111,7 +112,7 @@ final class ChannelBufferBytesReference implements BytesReference {
@Override
public BytesRef toBytesRef() {
if (buffer.hasArray()) {
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
}
byte[] copy = new byte[buffer.readableBytes()];
buffer.getBytes(buffer.readerIndex(), copy);
@ -120,7 +121,7 @@ final class ChannelBufferBytesReference implements BytesReference {
@Override
public BytesRef copyBytesRef() {
byte[] copy = new byte[buffer.readableBytes()];
byte[] copy = new byte[size];
buffer.getBytes(buffer.readerIndex(), copy);
return new BytesRef(copy);
}

View File

@ -22,16 +22,14 @@ package org.elasticsearch.transport.netty;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.netty.NettyUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.EOFException;
import java.io.IOException;
/**
* A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}.
*/
public class ChannelBufferStreamInput extends StreamInput {
class ChannelBufferStreamInput extends StreamInput {
private final ChannelBuffer buffer;
private final int startIndex;

View File

@ -1,36 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
/**
*/
public class ChannelBufferStreamInputFactory {
public static StreamInput create(ChannelBuffer buffer) {
return new ChannelBufferStreamInput(buffer, buffer.readableBytes());
}
public static StreamInput create(ChannelBuffer buffer, int size) {
return new ChannelBufferStreamInput(buffer, size);
}
}

View File

@ -1,437 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
* to the relevant action.
*/
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
protected final ESLogger logger;
protected final ThreadPool threadPool;
protected final TransportServiceAdapter transportServiceAdapter;
protected final NettyTransport transport;
protected final String profileName;
private final ThreadContext threadContext;
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
this.threadPool = transport.threadPool();
this.threadContext = threadPool.getThreadContext();
this.transportServiceAdapter = transport.transportServiceAdapter();
this.transport = transport;
this.logger = logger;
this.profileName = profileName;
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
transportServiceAdapter.sent(e.getWrittenAmount());
super.writeComplete(ctx, e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Transports.assertTransportThread();
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);
return;
}
ChannelBuffer buffer = (ChannelBuffer) m;
Marker marker = new Marker(buffer);
int size = marker.messageSizeWithRemainingHeaders();
transportServiceAdapter.received(marker.messageSizeWithAllHeaders());
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = marker.messageSize() != 0;
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumulation buffer, which is cleaned each time
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
boolean success = false;
try (ThreadContext.StoredContext tCtx = threadContext.stashContext()) {
long requestId = streamIn.readLong();
byte status = streamIn.readByte();
Version version = Version.fromId(streamIn.readInt());
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
Compressor compressor;
try {
compressor = CompressorFactory.compressor(NettyUtils.toBytesReference(buffer));
} catch (NotCompressedException ex) {
int maxToRead = Math.min(buffer.readableBytes(), 10);
int offset = buffer.readerIndex();
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
.append("] content bytes out of [").append(buffer.readableBytes())
.append("] readable bytes with message size [").append(size).append("] ").append("] are [");
for (int i = 0; i < maxToRead; i++) {
sb.append(buffer.getByte(offset + i)).append(",");
}
sb.append("]");
throw new IllegalStateException(sb.toString());
}
streamIn = compressor.streamInput(streamIn);
}
if (version.onOrAfter(Version.CURRENT.minimumCompatibilityVersion()) == false || version.major != Version.CURRENT.major) {
throw new IllegalStateException("Received message from unsupported version: [" + version
+ "] minimal compatible version is: [" +Version.CURRENT.minimumCompatibilityVersion() + "]");
}
streamIn.setVersion(version);
if (TransportStatus.isRequest(status)) {
threadContext.readHeaders(streamIn);
handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version);
} else {
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(streamIn, handler);
} else {
handleResponse(ctx.getChannel(), streamIn, handler);
}
marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status));
}
}
success = true;
} finally {
try {
if (success) {
IOUtils.close(streamIn);
} else {
IOUtils.closeWhileHandlingException(streamIn);
}
} finally {
// Set the expected position of the buffer, no matter what happened
buffer.readerIndex(marker.expectedReaderIndex());
}
}
}
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
final TransportResponse response = handler.newInstance();
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
response.remoteAddress();
try {
response.readFrom(buffer);
} catch (Throwable e) {
handleException(handler, new TransportSerializationException(
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
try {
if (ThreadPool.Names.SAME.equals(handler.executor())) {
//noinspection unchecked
handler.handleResponse(response);
} else {
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
}
} catch (Throwable e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}
}
private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
Throwable error;
try {
error = buffer.readThrowable();
} catch (Throwable e) {
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
}
handleException(handler, error);
}
private void handleException(final TransportResponseHandler handler, Throwable error) {
if (!(error instanceof RemoteTransportException)) {
error = new RemoteTransportException(error.getMessage(), error);
}
final RemoteTransportException rtx = (RemoteTransportException) error;
if (ThreadPool.Names.SAME.equals(handler.executor())) {
try {
handler.handleException(rtx);
} catch (Throwable e) {
logger.error("failed to handle exception response [{}]", e, handler);
}
} else {
threadPool.executor(handler.executor()).execute(new Runnable() {
@Override
public void run() {
try {
handler.handleException(rtx);
} catch (Throwable e) {
logger.error("failed to handle exception response [{}]", e, handler);
}
}
});
}
}
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes,
Version version) throws IOException {
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
final String action = buffer.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
NettyTransportChannel transportChannel = null;
try {
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException(action);
}
if (reg.canTripCircuitBreaker()) {
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
} else {
transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
}
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, messageLengthBytes);
final TransportRequest request = reg.newRequest();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
validateRequest(marker, buffer, requestId, action);
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
//noinspection unchecked
reg.processMessageReceived(request, transportChannel);
} else {
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
}
} catch (Throwable e) {
// the circuit breaker tripped
if (transportChannel == null) {
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, 0);
}
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [{}]", e, action);
logger.warn("Actual Exception", e1);
}
}
return action;
}
// This template method is needed to inject custom error checking logic in tests.
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
marker.validateRequest(buffer, requestId, action);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
transport.exceptionCaught(ctx, e);
}
class ResponseHandler implements Runnable {
private final TransportResponseHandler handler;
private final TransportResponse response;
public ResponseHandler(TransportResponseHandler handler, TransportResponse response) {
this.handler = handler;
this.response = response;
}
@SuppressWarnings({"unchecked"})
@Override
public void run() {
try {
handler.handleResponse(response);
} catch (Throwable e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}
}
}
class RequestHandler extends AbstractRunnable {
private final RequestHandlerRegistry reg;
private final TransportRequest request;
private final NettyTransportChannel transportChannel;
public RequestHandler(RequestHandlerRegistry reg, TransportRequest request, NettyTransportChannel transportChannel) {
this.reg = reg;
this.request = request;
this.transportChannel = transportChannel;
}
@SuppressWarnings({"unchecked"})
@Override
protected void doRun() throws Exception {
reg.processMessageReceived(request, transportChannel);
}
@Override
public boolean isForceExecution() {
return reg.isForceExecution();
}
@Override
public void onFailure(Throwable e) {
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
// we can only send a response transport is started....
try {
transportChannel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send error message back to client for action [{}]", e1, reg.getAction());
logger.warn("Actual Exception", e);
}
}
}
}
/**
* Internal helper class to store characteristic offsets of a buffer during processing
*/
protected static final class Marker {
private final ChannelBuffer buffer;
private final int remainingMessageSize;
private final int expectedReaderIndex;
public Marker(ChannelBuffer buffer) {
this.buffer = buffer;
// when this constructor is called, we have read already two parts of the message header: the marker bytes and the message
// message length (see SizeHeaderFrameDecoder). Hence we have to rewind the index for MESSAGE_LENGTH_SIZE bytes to read the
// remaining message length again.
this.remainingMessageSize = buffer.getInt(buffer.readerIndex() - NettyHeader.MESSAGE_LENGTH_SIZE);
this.expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
}
/**
* @return the number of bytes that have yet to be read from the buffer
*/
public int messageSizeWithRemainingHeaders() {
return remainingMessageSize;
}
/**
* @return the number in bytes for the message including all headers (even the ones that have been read from the buffer already)
*/
public int messageSizeWithAllHeaders() {
return remainingMessageSize + NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
}
/**
* @return the number of bytes for the message itself (excluding all headers).
*/
public int messageSize() {
return messageSizeWithAllHeaders() - NettyHeader.HEADER_SIZE;
}
/**
* @return the expected index of the buffer's reader after the message has been consumed entirely.
*/
public int expectedReaderIndex() {
return expectedReaderIndex;
}
/**
* Validates that a request has been fully read (not too few bytes but also not too many bytes).
*
* @param stream A stream that is associated with the buffer that is tracked by this marker.
* @param requestId The current request id.
* @param action The currently executed action.
* @throws IOException Iff the stream could not be read.
* @throws IllegalStateException Iff the request has not been fully read.
*/
public void validateRequest(StreamInput stream, long requestId, String action) throws IOException {
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
}
if (buffer.readerIndex() < expectedReaderIndex) {
throw new IllegalStateException("Message is fully read (request), yet there are "
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
}
if (buffer.readerIndex() > expectedReaderIndex) {
throw new IllegalStateException(
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
}
}
/**
* Validates that a response has been fully read (not too few bytes but also not too many bytes).
*
* @param stream A stream that is associated with the buffer that is tracked by this marker.
* @param requestId The corresponding request id for this response.
* @param handler The current response handler.
* @param error Whether validate an error response.
* @throws IOException Iff the stream could not be read.
* @throws IllegalStateException Iff the request has not been fully read.
*/
public void validateResponse(StreamInput stream, long requestId,
TransportResponseHandler<?> handler, boolean error) throws IOException {
// Check the entire message has been read
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
+ handler + "], error [" + error + "]; resetting");
}
if (buffer.readerIndex() < expectedReaderIndex) {
throw new IllegalStateException("Message is fully read (response), yet there are "
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
}
if (buffer.readerIndex() > expectedReaderIndex) {
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
+ "], handler [" + handler + "], error [" + error + "]; resetting");
}
}
}
}

View File

@ -1,76 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
*/
public class NettyHeader {
public static final int MARKER_BYTES_SIZE = 2 * 1;
public static final int MESSAGE_LENGTH_SIZE = 4;
public static final int REQUEST_ID_SIZE = 8;
public static final int STATUS_SIZE = 1;
public static final int VERSION_ID_SIZE = 4;
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
/**
* The magic number (must be lower than 0) for a ping message. This is handled
* specifically in {@link org.elasticsearch.transport.netty.SizeHeaderFrameDecoder}.
*/
public static final int PING_DATA_SIZE = -1;
private final static ChannelBuffer pingHeader;
static {
pingHeader = ChannelBuffers.buffer(6);
pingHeader.writeByte('E');
pingHeader.writeByte('S');
pingHeader.writeInt(PING_DATA_SIZE);
}
/**
* A ping header is same as regular header, just with -1 for the size of the message.
*/
public static ChannelBuffer pingHeader() {
return pingHeader.duplicate();
}
public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) {
int index = buffer.readerIndex();
buffer.setByte(index, 'E');
index += 1;
buffer.setByte(index, 'S');
index += 1;
// write the size, the size indicates the remaining message size, not including the size int
buffer.setInt(index, buffer.readableBytes() - MARKER_BYTES_SIZE - MESSAGE_LENGTH_SIZE);
index += MESSAGE_LENGTH_SIZE;
buffer.setLong(index, requestId);
index += REQUEST_ID_SIZE;
buffer.setByte(index, status);
index += STATUS_SIZE;
buffer.setInt(index, version.id);
}
}

View File

@ -27,11 +27,11 @@ import org.jboss.netty.logging.AbstractInternalLogger;
*
*/
@SuppressLoggerChecks(reason = "safely delegates to logger")
public class NettyInternalESLogger extends AbstractInternalLogger {
final class NettyInternalESLogger extends AbstractInternalLogger {
private final ESLogger logger;
public NettyInternalESLogger(ESLogger logger) {
NettyInternalESLogger(ESLogger logger) {
this.logger = logger;
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.logging.Loggers;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
*
*/
public class NettyInternalESLoggerFactory extends InternalLoggerFactory {
@Override
public InternalLogger newInstance(String name) {
return new NettyInternalESLogger(Loggers.getLogger(name));
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import java.net.InetSocketAddress;
/**
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
* to the relevant action.
*/
class NettyMessageChannelHandler extends SimpleChannelUpstreamHandler {
protected final TransportServiceAdapter transportServiceAdapter;
protected final NettyTransport transport;
protected final String profileName;
NettyMessageChannelHandler(NettyTransport transport, String profileName) {
this.transportServiceAdapter = transport.transportServiceAdapter();
this.transport = transport;
this.profileName = profileName;
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
transportServiceAdapter.sent(e.getWrittenAmount());
super.writeComplete(ctx, e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Transports.assertTransportThread();
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);
return;
}
final ChannelBuffer buffer = (ChannelBuffer) m;
final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE);
final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.getChannel().getRemoteAddress();
try {
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumulation buffer, which is cleaned each time so it could be bigger than the actual size
BytesReference reference = NettyUtils.toBytesReference(buffer, remainingMessageSize);
transport.messageReceived(reference, ctx.getChannel(), profileName, remoteAddress, remainingMessageSize);
} finally {
// Set the expected position of the buffer, no matter what happened
buffer.readerIndex(expectedReaderIndex);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
transport.exceptionCaught(ctx, e);
}
}

View File

@ -1,176 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class NettyTransportChannel implements TransportChannel {
private final NettyTransport transport;
private final TransportServiceAdapter transportServiceAdapter;
private final Version version;
private final String action;
private final Channel channel;
private final long requestId;
private final String profileName;
private final long reservedBytes;
private final AtomicBoolean released = new AtomicBoolean();
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel,
long requestId, Version version, String profileName, long reservedBytes) {
this.transportServiceAdapter = transportServiceAdapter;
this.version = version;
this.transport = transport;
this.action = action;
this.channel = channel;
this.requestId = requestId;
this.profileName = profileName;
this.reservedBytes = reservedBytes;
}
@Override
public String getProfileName() {
return profileName;
}
@Override
public String action() {
return this.action;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
release();
if (transport.compress) {
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
}
byte status = 0;
status = TransportStatus.setResponse(status);
ReleasableBytesStreamOutput bStream = null;
boolean addedReleaseListener = false;
try {
bStream = new ReleasableBytesStreamOutput(transport.bigArrays);
bStream.skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = bStream;
if (options.compress()) {
status = TransportStatus.setCompress(status);
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
}
stream.setVersion(version);
response.writeTo(stream);
stream.close();
ReleasablePagedBytesReference bytes = bStream.bytes();
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = channel.write(buffer);
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);
addedReleaseListener = true;
final TransportResponseOptions finalOptions = options;
ChannelFutureListener onResponseSentListener =
f -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions);
future.addListener(onResponseSentListener);
} finally {
if (!addedReleaseListener && bStream != null) {
Releasables.close(bStream.bytes());
}
}
}
@Override
public void sendResponse(Throwable error) throws IOException {
release();
BytesStreamOutput stream = new BytesStreamOutput();
stream.skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(
transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
stream.writeThrowable(tx);
byte status = 0;
status = TransportStatus.setResponse(status);
status = TransportStatus.setError(status);
BytesReference bytes = stream.bytes();
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = channel.write(buffer);
ChannelFutureListener onResponseSentListener =
f -> transportServiceAdapter.onResponseSent(requestId, action, error);
future.addListener(onResponseSentListener);
}
private void release() {
// attempt to release once atomically
if (released.compareAndSet(false, true) == false) {
throw new IllegalStateException("reserved bytes are already released");
}
transport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes);
}
@Override
public long getRequestId() {
return requestId;
}
@Override
public String getChannelType() {
return "netty";
}
/**
* Returns the underlying netty channel. This method is intended be used for access to netty to get additional
* details when processing the request and may be used by plugins. Responses should be sent using the methods
* defined in this class and not directly on the channel.
* @return underlying netty channel
*/
public Channel getChannel() {
return channel;
}
}

View File

@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.netty;
package org.elasticsearch.transport.netty;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.logging.InternalLogger;
@ -93,10 +93,11 @@ public class NettyUtils {
}
static {
InternalLoggerFactory.setDefaultFactory(new NettyInternalESLoggerFactory() {
InternalLoggerFactory.setDefaultFactory(new InternalLoggerFactory() {
@Override
public InternalLogger newInstance(String name) {
return super.newInstance(name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty."));
name = name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty.");
return new NettyInternalESLogger(Loggers.getLogger(name));
}
});
@ -136,6 +137,13 @@ public class NettyUtils {
* Wraps the given ChannelBuffer with a BytesReference
*/
public static BytesReference toBytesReference(ChannelBuffer channelBuffer) {
return new ChannelBufferBytesReference(channelBuffer);
return toBytesReference(channelBuffer, channelBuffer.readableBytes());
}
/**
* Wraps the given ChannelBuffer with a BytesReference of a given size
*/
public static BytesReference toBytesReference(ChannelBuffer channelBuffer, int size) {
return new ChannelBufferBytesReference(channelBuffer, size);
}
}

View File

@ -17,8 +17,9 @@
* under the License.
*/
package org.elasticsearch.common.netty;
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -32,13 +33,14 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import java.io.Closeable;
import java.util.Set;
/**
*
*/
@ChannelHandler.Sharable
public class OpenChannelsHandler implements ChannelUpstreamHandler {
public class OpenChannelsHandler implements ChannelUpstreamHandler, Releasable {
final Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
final CounterMetric openChannelsMetric = new CounterMetric();
@ -91,6 +93,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
return totalChannelsMetric.count();
}
@Override
public void close() {
for (Channel channel : openChannels) {
channel.close().awaitUninterruptibly();

View File

@ -19,107 +19,29 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpTransport;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import java.io.IOException;
import java.io.StreamCorruptedException;
/**
*/
public class SizeHeaderFrameDecoder extends FrameDecoder {
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().bytes() * 0.9);
final class SizeHeaderFrameDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
final int sizeHeaderLength = NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
if (buffer.readableBytes() < sizeHeaderLength) {
try {
boolean continueProcessing = TcpTransport.validateMessageHeader(NettyUtils.toBytesReference(buffer));
buffer.skipBytes(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
return continueProcessing ? buffer : null;
} catch (IllegalArgumentException ex) {
throw new TooLongFrameException(ex.getMessage(), ex);
} catch (IllegalStateException ex) {
return null;
}
int readerIndex = buffer.readerIndex();
if (buffer.getByte(readerIndex) != 'E' || buffer.getByte(readerIndex + 1) != 'S') {
// special handling for what is probably HTTP
if (bufferStartsWith(buffer, readerIndex, "GET ") ||
bufferStartsWith(buffer, readerIndex, "POST ") ||
bufferStartsWith(buffer, readerIndex, "PUT ") ||
bufferStartsWith(buffer, readerIndex, "HEAD ") ||
bufferStartsWith(buffer, readerIndex, "DELETE ") ||
bufferStartsWith(buffer, readerIndex, "OPTIONS ") ||
bufferStartsWith(buffer, readerIndex, "PATCH ") ||
bufferStartsWith(buffer, readerIndex, "TRACE ")) {
throw new HttpOnTransportException("This is not a HTTP port");
}
// we have 6 readable bytes, show 4 (should be enough)
throw new StreamCorruptedException("invalid internal transport message format, got ("
+ Integer.toHexString(buffer.getByte(readerIndex) & 0xFF) + ","
+ Integer.toHexString(buffer.getByte(readerIndex + 1) & 0xFF) + ","
+ Integer.toHexString(buffer.getByte(readerIndex + 2) & 0xFF) + ","
+ Integer.toHexString(buffer.getByte(readerIndex + 3) & 0xFF) + ")");
}
int dataLen = buffer.getInt(buffer.readerIndex() + NettyHeader.MARKER_BYTES_SIZE);
if (dataLen == NettyHeader.PING_DATA_SIZE) {
// discard the messages we read and continue, this is achieved by skipping the bytes
// and returning null
buffer.skipBytes(sizeHeaderLength);
return null;
}
if (dataLen <= 0) {
throw new StreamCorruptedException("invalid data length: " + dataLen);
}
// safety against too large frames being sent
if (dataLen > NINETY_PER_HEAP_SIZE) {
throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded ["
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
}
if (buffer.readableBytes() < dataLen + sizeHeaderLength) {
return null;
}
buffer.skipBytes(sizeHeaderLength);
return buffer;
}
private boolean bufferStartsWith(ChannelBuffer buffer, int readerIndex, String method) {
char[] chars = method.toCharArray();
for (int i = 0; i < chars.length; i++) {
if (buffer.getByte(readerIndex + i) != chars[i]) {
return false;
}
}
return true;
}
/**
* A helper exception to mark an incoming connection as potentially being HTTP
* so an appropriate error code can be returned
*/
public static class HttpOnTransportException extends ElasticsearchException {
public HttpOnTransportException(String msg) {
super(msg);
}
@Override
public RestStatus status() {
return RestStatus.BAD_REQUEST;
}
public HttpOnTransportException(StreamInput in) throws IOException{
super(in);
}
}
}

View File

@ -80,6 +80,7 @@ import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.ActionTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
import java.net.URISyntaxException;
@ -763,7 +764,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(122, null);
ids.put(123, org.elasticsearch.indices.IndexAlreadyExistsException.class);
ids.put(124, org.elasticsearch.script.Script.ScriptParseException.class);
ids.put(125, org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class);
ids.put(125, TcpTransport.HttpOnTransportException.class);
ids.put(126, org.elasticsearch.index.mapper.MapperParsingException.class);
ids.put(127, org.elasticsearch.search.SearchContextException.class);
ids.put(128, org.elasticsearch.search.builder.SearchSourceBuilderException.class);

View File

@ -162,20 +162,6 @@ public class ChannelsTests extends ESTestCase {
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
}
public void testWriteFromChannel() throws IOException {
int length = randomIntBetween(1, randomBytes.length / 2);
int offset = randomIntBetween(0, randomBytes.length - length);
ByteBuffer byteBuffer = ByteBuffer.wrap(randomBytes);
ChannelBuffer source = new ByteBufferBackedChannelBuffer(byteBuffer);
Channels.writeToChannel(source, offset, length, fileChannel);
BytesReference copyRef = new BytesArray(Channels.readFromFileChannel(fileChannel, 0, length));
BytesReference sourceRef = new BytesArray(randomBytes, offset, length);
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
}
class MockFileChannel extends FileChannel {
FileChannel delegate;

View File

@ -357,7 +357,6 @@ public class LuceneTests extends ESTestCase {
dir.close();
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/19151")
public void testAsSequentialAccessBits() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new KeywordAnalyzer()));
@ -378,7 +377,7 @@ public class LuceneTests extends ESTestCase {
IndexSearcher searcher = newSearcher(reader);
Weight termWeight = new TermQuery(new Term("foo", "bar")).createWeight(searcher, false);
assertEquals(1, reader.leaves().size());
LeafReaderContext leafReaderContext = reader.leaves().get(0);
LeafReaderContext leafReaderContext = searcher.getIndexReader().leaves().get(0);
Bits bits = Lucene.asSequentialAccessBits(leafReaderContext.reader().maxDoc(), termWeight.scorer(leafReaderContext));
expectThrows(IndexOutOfBoundsException.class, () -> bits.get(-1));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.rounding;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.rounding.TimeZoneRounding.TimeIntervalRounding;
import org.elasticsearch.common.rounding.TimeZoneRounding.TimeUnitRounding;
import org.elasticsearch.common.unit.TimeValue;
@ -31,10 +32,13 @@ import org.joda.time.DateTimeConstants;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -328,29 +332,70 @@ public class TimeZoneRoundingTests extends ESTestCase {
long interval = unit.toMillis(randomIntBetween(1, 365));
DateTimeZone tz = randomDateTimeZone();
TimeZoneRounding rounding = new TimeZoneRounding.TimeIntervalRounding(interval, tz);
long date = Math.abs(randomLong() % (2 * (long) 10e11)); // 1970-01-01T00:00:00Z - 2033-05-18T05:33:20.000+02:00
try {
final long roundedDate = rounding.round(date);
final long nextRoundingValue = rounding.nextRoundingValue(roundedDate);
assertThat("Rounding should be idempotent", roundedDate, equalTo(rounding.round(roundedDate)));
assertThat("Rounded value smaller or equal than unrounded", roundedDate, lessThanOrEqualTo(date));
assertThat("Values smaller than rounded value should round further down", rounding.round(roundedDate - 1),
lessThan(roundedDate));
long mainDate = Math.abs(randomLong() % (2 * (long) 10e11)); // 1970-01-01T00:00:00Z - 2033-05-18T05:33:20.000+02:00
if (randomBoolean()) {
mainDate = nastyDate(mainDate, tz, interval);
}
// check two intervals around date
long previousRoundedValue = Long.MIN_VALUE;
for (long date = mainDate - 2 * interval; date < mainDate + 2 * interval; date += interval / 2) {
try {
final long roundedDate = rounding.round(date);
final long nextRoundingValue = rounding.nextRoundingValue(roundedDate);
assertThat("Rounding should be idempotent", roundedDate, equalTo(rounding.round(roundedDate)));
assertThat("Rounded value smaller or equal than unrounded", roundedDate, lessThanOrEqualTo(date));
assertThat("Values smaller than rounded value should round further down", rounding.round(roundedDate - 1),
lessThan(roundedDate));
assertThat("Rounding should be >= previous rounding value", roundedDate, greaterThanOrEqualTo(previousRoundedValue));
if (tz.isFixed()) {
assertThat("NextRounding value should be greater than date", nextRoundingValue, greaterThan(roundedDate));
assertThat("NextRounding value should be interval from rounded value", nextRoundingValue - roundedDate,
equalTo(interval));
assertThat("NextRounding value should be a rounded date", nextRoundingValue,
equalTo(rounding.round(nextRoundingValue)));
if (tz.isFixed()) {
assertThat("NextRounding value should be greater than date", nextRoundingValue, greaterThan(roundedDate));
assertThat("NextRounding value should be interval from rounded value", nextRoundingValue - roundedDate,
equalTo(interval));
assertThat("NextRounding value should be a rounded date", nextRoundingValue,
equalTo(rounding.round(nextRoundingValue)));
}
previousRoundedValue = roundedDate;
} catch (AssertionError e) {
logger.error("Rounding error at {}, timezone {}, interval: {},", new DateTime(date, tz), tz, interval);
throw e;
}
} catch (AssertionError e) {
logger.error("Rounding error at {}, timezone {}, interval: {},", new DateTime(date, tz), tz, interval);
throw e;
}
}
}
/**
* Test that rounded values are always greater or equal to last rounded value if date is increasing.
* The example covers an interval around 2011-10-30T02:10:00+01:00, time zone CET, interval: 2700000ms
*/
public void testIntervalRoundingMonotonic_CET() {
long interval = TimeUnit.MINUTES.toMillis(45);
DateTimeZone tz = DateTimeZone.forID("CET");
TimeZoneRounding rounding = new TimeZoneRounding.TimeIntervalRounding(interval, tz);
List<Tuple<String, String>> expectedDates = new ArrayList<Tuple<String, String>>();
// first date is the date to be rounded, second the expected result
expectedDates.add(new Tuple<>("2011-10-30T01:40:00.000+02:00", "2011-10-30T01:30:00.000+02:00"));
expectedDates.add(new Tuple<>("2011-10-30T02:02:30.000+02:00", "2011-10-30T01:30:00.000+02:00"));
expectedDates.add(new Tuple<>("2011-10-30T02:25:00.000+02:00", "2011-10-30T02:15:00.000+02:00"));
expectedDates.add(new Tuple<>("2011-10-30T02:47:30.000+02:00", "2011-10-30T02:15:00.000+02:00"));
expectedDates.add(new Tuple<>("2011-10-30T02:10:00.000+01:00", "2011-10-30T02:15:00.000+02:00"));
expectedDates.add(new Tuple<>("2011-10-30T02:32:30.000+01:00", "2011-10-30T02:15:00.000+01:00"));
expectedDates.add(new Tuple<>("2011-10-30T02:55:00.000+01:00", "2011-10-30T02:15:00.000+01:00"));
expectedDates.add(new Tuple<>("2011-10-30T03:17:30.000+01:00", "2011-10-30T03:00:00.000+01:00"));
long previousDate = Long.MIN_VALUE;
for (Tuple<String, String> dates : expectedDates) {
final long roundedDate = rounding.round(time(dates.v1()));
assertThat(roundedDate, isDate(time(dates.v2()), tz));
assertThat(roundedDate, greaterThanOrEqualTo(previousDate));
previousDate = roundedDate;
}
// here's what this means for interval widths
assertEquals(TimeUnit.MINUTES.toMillis(45), time("2011-10-30T02:15:00.000+02:00") - time("2011-10-30T01:30:00.000+02:00"));
assertEquals(TimeUnit.MINUTES.toMillis(60), time("2011-10-30T02:15:00.000+01:00") - time("2011-10-30T02:15:00.000+02:00"));
assertEquals(TimeUnit.MINUTES.toMillis(45), time("2011-10-30T03:00:00.000+01:00") - time("2011-10-30T02:15:00.000+01:00"));
}
/**
* special test for DST switch from #9491
*/

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.transport.netty;
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.KeyedLock;

View File

@ -98,7 +98,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
public void testReadProcessors() throws Exception {
Processor processor = mock(Processor.class);
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
builder.registerProcessor("test_processor", (registry) -> config -> processor);
builder.registerProcessor("test_processor", (registry) -> (tag, config) -> processor);
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));

View File

@ -40,7 +40,7 @@ public class PipelineFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Map<String, Object> processorConfig0 = new HashMap<>();
Map<String, Object> processorConfig1 = new HashMap<>();
processorConfig0.put(AbstractProcessorFactory.TAG_KEY, "first-processor");
processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY,

View File

@ -58,7 +58,7 @@ public class PipelineStoreTests extends ESTestCase {
public void init() throws Exception {
store = new PipelineStore(Settings.EMPTY);
ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder();
registryBuilder.registerProcessor("set", (registry) -> config -> {
registryBuilder.registerProcessor("set", (registry) -> (tag, config) -> {
String field = (String) config.remove("field");
String value = (String) config.remove("value");
return new Processor() {
@ -78,7 +78,7 @@ public class PipelineStoreTests extends ESTestCase {
}
};
});
registryBuilder.registerProcessor("remove", (registry) -> config -> {
registryBuilder.registerProcessor("remove", (registry) -> (tag, config) -> {
String field = (String) config.remove("field");
return new Processor() {
@Override

View File

@ -55,7 +55,7 @@ public class FileScriptTests extends ESTestCase {
.put("script.engine." + MockScriptEngine.NAME + ".file.aggs", "false").build();
ScriptService scriptService = makeScriptService(settings);
Script script = new Script("script1", ScriptService.ScriptType.FILE, MockScriptEngine.NAME, null);
CompiledScript compiledScript = scriptService.compile(script, ScriptContext.Standard.SEARCH, Collections.emptyMap(), null);
CompiledScript compiledScript = scriptService.compile(script, ScriptContext.Standard.SEARCH, Collections.emptyMap());
assertNotNull(compiledScript);
MockCompiledScript executable = (MockCompiledScript) compiledScript.compiled();
assertEquals("script1.mockscript", executable.name);
@ -72,7 +72,7 @@ public class FileScriptTests extends ESTestCase {
Script script = new Script("script1", ScriptService.ScriptType.FILE, MockScriptEngine.NAME, null);
for (ScriptContext context : ScriptContext.Standard.values()) {
try {
scriptService.compile(script, context, Collections.emptyMap(), null);
scriptService.compile(script, context, Collections.emptyMap());
fail(context.getKey() + " script should have been rejected");
} catch(Exception e) {
assertTrue(e.getMessage(), e.getMessage().contains("scripts of type [file], operation [" + context.getKey() + "] and lang [" + MockScriptEngine.NAME + "] are disabled"));

View File

@ -19,8 +19,13 @@
package org.elasticsearch.script;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -30,13 +35,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -55,10 +53,9 @@ public class NativeScriptTests extends ESTestCase {
List<Setting<?>> scriptSettings = scriptModule.getSettings();
scriptSettings.add(InternalSettingsPlugin.VERSION_CREATED);
ClusterState state = ClusterState.builder(new ClusterName("_name")).build();
ExecutableScript executable = scriptModule.getScriptService().executable(
new Script("my", ScriptType.INLINE, NativeScriptEngineService.NAME, null), ScriptContext.Standard.SEARCH,
Collections.emptyMap(), state);
Collections.emptyMap());
assertThat(executable.run().toString(), equalTo("test"));
}
@ -85,7 +82,7 @@ public class NativeScriptTests extends ESTestCase {
for (ScriptContext scriptContext : scriptContextRegistry.scriptContexts()) {
assertThat(scriptService.compile(new Script("my", ScriptType.INLINE, NativeScriptEngineService.NAME, null), scriptContext,
Collections.emptyMap(), null), notNullValue());
Collections.emptyMap()), notNullValue());
}
}

View File

@ -59,7 +59,7 @@ public class ScriptContextTests extends ESTestCase {
for (ScriptService.ScriptType scriptType : ScriptService.ScriptType.values()) {
try {
Script script = new Script("1", scriptType, MockScriptEngine.NAME, null);
scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "custom_globally_disabled_op"), Collections.emptyMap(), null);
scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "custom_globally_disabled_op"), Collections.emptyMap());
fail("script compilation should have been rejected");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("scripts of type [" + scriptType + "], operation [" + PLUGIN_NAME + "_custom_globally_disabled_op] and lang [" + MockScriptEngine.NAME + "] are disabled"));
@ -71,16 +71,16 @@ public class ScriptContextTests extends ESTestCase {
ScriptService scriptService = makeScriptService();
Script script = new Script("1", ScriptService.ScriptType.INLINE, MockScriptEngine.NAME, null);
try {
scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "custom_exp_disabled_op"), Collections.emptyMap(), null);
scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "custom_exp_disabled_op"), Collections.emptyMap());
fail("script compilation should have been rejected");
} catch (IllegalStateException e) {
assertTrue(e.getMessage(), e.getMessage().contains("scripts of type [inline], operation [" + PLUGIN_NAME + "_custom_exp_disabled_op] and lang [" + MockScriptEngine.NAME + "] are disabled"));
}
// still works for other script contexts
assertNotNull(scriptService.compile(script, ScriptContext.Standard.AGGS, Collections.emptyMap(), null));
assertNotNull(scriptService.compile(script, ScriptContext.Standard.SEARCH, Collections.emptyMap(), null));
assertNotNull(scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "custom_op"), Collections.emptyMap(), null));
assertNotNull(scriptService.compile(script, ScriptContext.Standard.AGGS, Collections.emptyMap()));
assertNotNull(scriptService.compile(script, ScriptContext.Standard.SEARCH, Collections.emptyMap()));
assertNotNull(scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "custom_op"), Collections.emptyMap()));
}
public void testUnknownPluginScriptContext() throws Exception {
@ -88,7 +88,7 @@ public class ScriptContextTests extends ESTestCase {
for (ScriptService.ScriptType scriptType : ScriptService.ScriptType.values()) {
try {
Script script = new Script("1", scriptType, MockScriptEngine.NAME, null);
scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "unknown"), Collections.emptyMap(), null);
scriptService.compile(script, new ScriptContext.Plugin(PLUGIN_NAME, "unknown"), Collections.emptyMap());
fail("script compilation should have been rejected");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage(), e.getMessage().contains("script context [" + PLUGIN_NAME + "_unknown] not supported"));
@ -107,7 +107,7 @@ public class ScriptContextTests extends ESTestCase {
for (ScriptService.ScriptType scriptType : ScriptService.ScriptType.values()) {
try {
Script script = new Script("1", scriptType, MockScriptEngine.NAME, null);
scriptService.compile(script, context, Collections.emptyMap(), null);
scriptService.compile(script, context, Collections.emptyMap());
fail("script compilation should have been rejected");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage(), e.getMessage().contains("script context [test] not supported"));

View File

@ -114,9 +114,10 @@ public class ScriptServiceTests extends ESTestCase {
private void buildScriptService(Settings additionalSettings) throws IOException {
Settings finalSettings = Settings.builder().put(baseSettings).put(additionalSettings).build();
Environment environment = new Environment(finalSettings);
// TODO:
scriptService = new ScriptService(finalSettings, environment, resourceWatcherService, scriptEngineRegistry, scriptContextRegistry, scriptSettings) {
@Override
String getScriptFromClusterState(ClusterState state, String scriptLang, String id) {
String getScriptFromClusterState(String scriptLang, String id) {
//mock the script that gets retrieved from an index
return "100";
}
@ -141,7 +142,7 @@ public class ScriptServiceTests extends ESTestCase {
resourceWatcherService.notifyNow();
CompiledScript compiledScript = scriptService.compile(new Script("test_script", ScriptType.FILE, "test", null),
ScriptContext.Standard.SEARCH, Collections.emptyMap(), emptyClusterState());
ScriptContext.Standard.SEARCH, Collections.emptyMap());
assertThat(compiledScript.compiled(), equalTo((Object) "compiled_test_file"));
Files.delete(testFileNoExt);
@ -150,7 +151,7 @@ public class ScriptServiceTests extends ESTestCase {
try {
scriptService.compile(new Script("test_script", ScriptType.FILE, "test", null), ScriptContext.Standard.SEARCH,
Collections.emptyMap(), emptyClusterState());
Collections.emptyMap());
fail("the script test_script should no longer exist");
} catch (IllegalArgumentException ex) {
assertThat(ex.getMessage(), containsString("Unable to find on disk file script [test_script] using lang [test]"));
@ -168,7 +169,7 @@ public class ScriptServiceTests extends ESTestCase {
resourceWatcherService.notifyNow();
CompiledScript compiledScript = scriptService.compile(new Script("file_script", ScriptType.FILE, "test", null),
ScriptContext.Standard.SEARCH, Collections.emptyMap(), emptyClusterState());
ScriptContext.Standard.SEARCH, Collections.emptyMap());
assertThat(compiledScript.compiled(), equalTo((Object) "compiled_test_file_script"));
Files.delete(testHiddenFile);
@ -179,9 +180,9 @@ public class ScriptServiceTests extends ESTestCase {
public void testInlineScriptCompiledOnceCache() throws IOException {
buildScriptService(Settings.EMPTY);
CompiledScript compiledScript1 = scriptService.compile(new Script("1+1", ScriptType.INLINE, "test", null),
randomFrom(scriptContexts), Collections.emptyMap(), emptyClusterState());
randomFrom(scriptContexts), Collections.emptyMap());
CompiledScript compiledScript2 = scriptService.compile(new Script("1+1", ScriptType.INLINE, "test", null),
randomFrom(scriptContexts), Collections.emptyMap(), emptyClusterState());
randomFrom(scriptContexts), Collections.emptyMap());
assertThat(compiledScript1.compiled(), sameInstance(compiledScript2.compiled()));
}
@ -304,7 +305,7 @@ public class ScriptServiceTests extends ESTestCase {
String type = scriptEngineService.getType();
try {
scriptService.compile(new Script("test", randomFrom(ScriptType.values()), type, null), new ScriptContext.Plugin(
pluginName, unknownContext), Collections.emptyMap(), emptyClusterState());
pluginName, unknownContext), Collections.emptyMap());
fail("script compilation should have been rejected");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("script context [" + pluginName + "_" + unknownContext + "] not supported"));
@ -314,22 +315,20 @@ public class ScriptServiceTests extends ESTestCase {
public void testCompileCountedInCompilationStats() throws IOException {
buildScriptService(Settings.EMPTY);
scriptService.compile(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts),
Collections.emptyMap(), emptyClusterState());
Collections.emptyMap());
assertEquals(1L, scriptService.stats().getCompilations());
}
public void testExecutableCountedInCompilationStats() throws IOException {
buildScriptService(Settings.EMPTY);
ClusterState state = ClusterState.builder(new ClusterName("_name")).build();
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap(), state);
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap());
assertEquals(1L, scriptService.stats().getCompilations());
}
public void testSearchCountedInCompilationStats() throws IOException {
buildScriptService(Settings.EMPTY);
ClusterState state = ClusterState.builder(new ClusterName("_name")).build();
scriptService.search(null, new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts),
Collections.emptyMap(), state);
Collections.emptyMap());
assertEquals(1L, scriptService.stats().getCompilations());
}
@ -339,7 +338,7 @@ public class ScriptServiceTests extends ESTestCase {
for (int i = 0; i < numberOfCompilations; i++) {
scriptService
.compile(new Script(i + " + " + i, ScriptType.INLINE, "test", null), randomFrom(scriptContexts),
Collections.emptyMap(), emptyClusterState());
Collections.emptyMap());
}
assertEquals(numberOfCompilations, scriptService.stats().getCompilations());
}
@ -349,9 +348,8 @@ public class ScriptServiceTests extends ESTestCase {
builder.put(ScriptService.SCRIPT_CACHE_SIZE_SETTING.getKey(), 1);
builder.put("script.inline", "true");
buildScriptService(builder.build());
ClusterState state = ClusterState.builder(new ClusterName("_name")).build();
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap(), state);
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap(), state);
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap());
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap());
assertEquals(1L, scriptService.stats().getCompilations());
}
@ -359,14 +357,14 @@ public class ScriptServiceTests extends ESTestCase {
buildScriptService(Settings.EMPTY);
createFileScripts("test");
scriptService.compile(new Script("file_script", ScriptType.FILE, "test", null), randomFrom(scriptContexts),
Collections.emptyMap(), emptyClusterState());
Collections.emptyMap());
assertEquals(1L, scriptService.stats().getCompilations());
}
public void testIndexedScriptCountedInCompilationStats() throws IOException {
buildScriptService(Settings.EMPTY);
scriptService.compile(new Script("script", ScriptType.STORED, "test", null), randomFrom(scriptContexts),
Collections.emptyMap(), emptyClusterState());
Collections.emptyMap());
assertEquals(1L, scriptService.stats().getCompilations());
}
@ -375,9 +373,8 @@ public class ScriptServiceTests extends ESTestCase {
builder.put(ScriptService.SCRIPT_CACHE_SIZE_SETTING.getKey(), 1);
builder.put("script.inline", "true");
buildScriptService(builder.build());
ClusterState state = ClusterState.builder(new ClusterName("_name")).build();
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap(), state);
scriptService.executable(new Script("2+2", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap(), state);
scriptService.executable(new Script("1+1", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap());
scriptService.executable(new Script("2+2", ScriptType.INLINE, "test", null), randomFrom(scriptContexts), Collections.emptyMap());
assertEquals(2L, scriptService.stats().getCompilations());
assertEquals(1L, scriptService.stats().getCacheEvictions());
}
@ -388,7 +385,7 @@ public class ScriptServiceTests extends ESTestCase {
builder.put("script.inline", "true");
buildScriptService(builder.build());
CompiledScript script = scriptService.compile(new Script("1 + 1", ScriptType.INLINE, null, null),
randomFrom(scriptContexts), Collections.emptyMap(), emptyClusterState());
randomFrom(scriptContexts), Collections.emptyMap());
assertEquals(script.lang(), "test");
}
@ -469,7 +466,7 @@ public class ScriptServiceTests extends ESTestCase {
private void assertCompileRejected(String lang, String script, ScriptType scriptType, ScriptContext scriptContext) {
try {
scriptService.compile(new Script(script, scriptType, lang, null), scriptContext, Collections.emptyMap(), emptyClusterState());
scriptService.compile(new Script(script, scriptType, lang, null), scriptContext, Collections.emptyMap());
fail("compile should have been rejected for lang [" + lang + "], script_type [" + scriptType + "], scripted_op [" + scriptContext + "]");
} catch(IllegalStateException e) {
//all good
@ -477,9 +474,8 @@ public class ScriptServiceTests extends ESTestCase {
}
private void assertCompileAccepted(String lang, String script, ScriptType scriptType, ScriptContext scriptContext) {
ClusterState state = emptyClusterState();
assertThat(
scriptService.compile(new Script(script, scriptType, lang, null), scriptContext, Collections.emptyMap(), state),
scriptService.compile(new Script(script, scriptType, lang, null), scriptContext, Collections.emptyMap()),
notNullValue()
);
}
@ -528,8 +524,6 @@ public class ScriptServiceTests extends ESTestCase {
public static final String NAME = "dtest";
public static final List<String> EXTENSIONS = Collections.unmodifiableList(Arrays.asList("dtest"));
@Override
public String getType() {
return NAME;
@ -559,9 +553,4 @@ public class ScriptServiceTests extends ESTestCase {
public void close() {
}
}
private static ClusterState emptyClusterState() {
return ClusterState.builder(new ClusterName("_name")).build();
}
}

View File

@ -100,7 +100,7 @@ public abstract class AbstractSortTestCase<T extends SortBuilder<T>> extends EST
scriptService = new ScriptService(baseSettings, environment,
new ResourceWatcherService(baseSettings, null), scriptEngineRegistry, scriptContextRegistry, scriptSettings) {
@Override
public CompiledScript compile(Script script, ScriptContext scriptContext, Map<String, String> params, ClusterState state) {
public CompiledScript compile(Script script, ScriptContext scriptContext, Map<String, String> params) {
return new CompiledScript(ScriptType.INLINE, "mockName", "test", script);
}
};

View File

@ -324,16 +324,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testVoidMessageCompressed() {
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
try {
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
(request, channel) -> {
try {
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
});

View File

@ -17,13 +17,11 @@
* under the License.
*/
package org.elasticsearch.transport.netty;
package org.elasticsearch.transport;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.TransportSettings;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@ -32,11 +30,11 @@ import java.util.List;
import static java.net.InetAddress.getByName;
import static java.util.Arrays.asList;
import static org.elasticsearch.transport.netty.NettyTransport.resolvePublishPort;
import static org.elasticsearch.transport.TcpTransport.resolvePublishPort;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class NettyPublishPortTests extends ESTestCase {
public class PublishPortTests extends ESTestCase {
public void testPublishPort() throws Exception {
int boundPort = randomIntBetween(9000, 9100);

View File

@ -17,17 +17,17 @@
* under the License.
*/
package org.elasticsearch.transport.netty;
package org.elasticsearch.transport;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
/** Unit tests for NettyTransport */
public class NettyTransportTests extends ESTestCase {
/** Unit tests for TCPTransport */
public class TCPTransportTests extends ESTestCase {
/** Test ipv4 host with a default port works */
public void testParseV4DefaultPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
@ -36,19 +36,19 @@ public class NettyTransportTests extends ESTestCase {
/** Test ipv4 host with a default port range works */
public void testParseV4DefaultRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
assertEquals(1234, addresses[0].getPort());
assertEquals("127.0.0.1", addresses[1].getAddress());
assertEquals(1235, addresses[1].getPort());
}
/** Test ipv4 host with port works */
public void testParseV4WithPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
@ -57,7 +57,7 @@ public class NettyTransportTests extends ESTestCase {
/** Test ipv4 host with port range works */
public void testParseV4WithPortRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
@ -70,7 +70,7 @@ public class NettyTransportTests extends ESTestCase {
/** Test unbracketed ipv6 hosts in configuration fail. Leave no ambiguity */
public void testParseV6UnBracketed() throws Exception {
try {
NettyTransport.parse("::1", "1234", Integer.MAX_VALUE);
TcpTransport.parse("::1", "1234", Integer.MAX_VALUE);
fail("should have gotten exception");
} catch (IllegalArgumentException expected) {
assertTrue(expected.getMessage().contains("must be bracketed"));
@ -79,7 +79,7 @@ public class NettyTransportTests extends ESTestCase {
/** Test ipv6 host with a default port works */
public void testParseV6DefaultPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("::1", addresses[0].getAddress());
@ -88,19 +88,19 @@ public class NettyTransportTests extends ESTestCase {
/** Test ipv6 host with a default port range works */
public void testParseV6DefaultRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("::1", addresses[0].getAddress());
assertEquals(1234, addresses[0].getPort());
assertEquals("::1", addresses[1].getAddress());
assertEquals(1235, addresses[1].getPort());
}
/** Test ipv6 host with port works */
public void testParseV6WithPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("::1", addresses[0].getAddress());
@ -109,7 +109,7 @@ public class NettyTransportTests extends ESTestCase {
/** Test ipv6 host with port range works */
public void testParseV6WithPortRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("::1", addresses[0].getAddress());
@ -118,10 +118,10 @@ public class NettyTransportTests extends ESTestCase {
assertEquals("::1", addresses[1].getAddress());
assertEquals(2346, addresses[1].getPort());
}
/** Test per-address limit */
public void testAddressLimit() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]:100-200", "1000", 3);
TransportAddress[] addresses = TcpTransport.parse("[::1]:100-200", "1000", 3);
assertEquals(3, addresses.length);
assertEquals(100, addresses[0].getPort());
assertEquals(101, addresses[1].getPort());

View File

@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.netty;
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.transport.netty.NettyUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
@ -33,6 +32,7 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -56,7 +56,7 @@ public class NettyScheduledPingTests extends ESTestCase {
ThreadPool threadPool = new TestThreadPool(getClass().getName());
Settings settings = Settings.builder()
.put(NettyTransport.PING_SCHEDULE.getKey(), "5ms")
.put(TcpTransport.PING_SCHEDULE.getKey(), "5ms")
.put(TransportSettings.PORT.getKey(), 0)
.put("cluster.name", "test")
.build();
@ -89,12 +89,12 @@ public class NettyScheduledPingTests extends ESTestCase {
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(100L));
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(100L));
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
}
});
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@ -137,15 +137,12 @@ public class NettyScheduledPingTests extends ESTestCase {
}).txGet();
}
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(200L));
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(200L));
}
assertBusy(() -> {
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
});
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
Releasables.close(serviceA, serviceB);
terminate(threadPool);

View File

@ -44,6 +44,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
@ -98,45 +99,24 @@ public class NettyTransportIT extends ESIntegTestCase {
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
}
protected String handleRequest(Channel channel, String profileName,
StreamInput stream, long requestId, int messageLengthBytes, Version version,
InetSocketAddress remoteAddress) throws IOException {
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
remoteAddress);
channelProfileName = TransportSettings.DEFAULT_PROFILE;
return action;
}
@Override
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings groupSettings) {
return new ErrorPipelineFactory(this, name, groupSettings);
}
private static class ErrorPipelineFactory extends ServerChannelPipelineFactory {
private final ESLogger logger;
public ErrorPipelineFactory(ExceptionThrowingNettyTransport nettyTransport, String name, Settings groupSettings) {
super(nettyTransport, name, groupSettings);
this.logger = nettyTransport.logger;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = super.getPipeline();
pipeline.replace("dispatcher", "dispatcher",
new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) {
@Override
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId,
int messageLengthBytes, Version version) throws IOException {
String action = super.handleRequest(channel, marker, buffer, requestId, messageLengthBytes, version);
channelProfileName = this.profileName;
return action;
}
@Override
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
super.validateRequest(marker, buffer, requestId, action);
String error = threadPool.getThreadContext().getHeader("ERROR");
if (error != null) {
throw new ElasticsearchException(error);
}
}
});
return pipeline;
protected void validateRequest(StreamInput buffer, long requestId, String action)
throws IOException {
super.validateRequest(buffer, requestId, action);
String error = threadPool.getThreadContext().getHeader("ERROR");
if (error != null) {
throw new ElasticsearchException(error);
}
}
}
}

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
@ -30,6 +29,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.junit.Before;
@ -58,7 +58,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
.build();
ThreadPool threadPool = new TestThreadPool("tst");
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
assertEquals(1, transport.profileBoundAddresses().size());
assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally {
@ -74,7 +74,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
.build();
ThreadPool threadPool = new TestThreadPool("tst");
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
assertEquals(1, transport.profileBoundAddresses().size());
assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally {
@ -91,7 +91,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
.build();
ThreadPool threadPool = new TestThreadPool("tst");
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
assertEquals(0, transport.profileBoundAddresses().size());
assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally {
@ -107,7 +107,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
.build();
ThreadPool threadPool = new TestThreadPool("tst");
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
assertEquals(0, transport.profileBoundAddresses().size());
assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally {
@ -125,7 +125,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
.build();
ThreadPool threadPool = new TestThreadPool("tst");
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
assertEquals(0, transport.profileBoundAddresses().size());
assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally {
@ -133,14 +133,13 @@ public class NettyTransportMultiPortTests extends ESTestCase {
}
}
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
TcpTransport<?> transport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
new NamedWriteableRegistry(), new NoneCircuitBreakerService());
nettyTransport.start();
transport.start();
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
return nettyTransport;
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
return transport;
}
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.netty;
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -27,7 +27,6 @@ import org.elasticsearch.test.ESTestCase;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.junit.Before;
import java.io.IOException;

View File

@ -63,7 +63,7 @@ DEFAULT_PLUGINS = ["analysis-icu",
"analysis-phonetic",
"analysis-smartcn",
"analysis-stempel",
"discovery-azure",
"discovery-azure-classic",
"discovery-ec2",
"discovery-gce",
"ingest-attachment",

View File

@ -1,9 +1,13 @@
[[discovery-azure]]
=== Azure Discovery Plugin
[[discovery-azure-classic]]
=== Azure Classic Discovery Plugin
The Azure Discovery plugin uses the Azure API for unicast discovery.
The Azure Classic Discovery plugin uses the Azure Classic API for unicast discovery.
[[discovery-azure-install]]
// TODO: Link to ARM plugin when ready
// See issue https://github.com/elastic/elasticsearch/issues/19146
deprecated[5.0.0, Use coming Azure ARM Discovery plugin instead]
[[discovery-azure-classic-install]]
[float]
==== Installation
@ -11,13 +15,13 @@ This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install discovery-azure
sudo bin/elasticsearch-plugin install discovery-azure-classic
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
[[discovery-azure-remove]]
[[discovery-azure-classic-remove]]
[float]
==== Removal
@ -25,12 +29,12 @@ The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove discovery-azure
sudo bin/elasticsearch-plugin remove discovery-azure-classic
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[discovery-azure-usage]]
[[discovery-azure-classic-usage]]
==== Azure Virtual Machine Discovery
Azure VM discovery allows to use the azure APIs to perform automatic discovery (similar to multicast in non hostile
@ -64,7 +68,7 @@ You can use {ref}/modules-network.html[core network host settings]. For example
==============================================
[[discovery-azure-short]]
[[discovery-azure-classic-short]]
===== How to start (short story)
* Create Azure instances
@ -73,7 +77,7 @@ You can use {ref}/modules-network.html[core network host settings]. For example
* Modify `elasticsearch.yml` file
* Start Elasticsearch
[[discovery-azure-settings]]
[[discovery-azure-classic-settings]]
===== Azure credential API settings
The following are a list of settings that can further control the credential API:
@ -100,7 +104,7 @@ The following are a list of settings that can further control the credential API
your_azure_cloud_service_name
[[discovery-azure-settings-advanced]]
[[discovery-azure-classic-settings-advanced]]
===== Advanced settings
The following are a list of settings that can further control the discovery:
@ -143,7 +147,7 @@ discovery:
slot: production
----
[[discovery-azure-long]]
[[discovery-azure-classic-long]]
==== Setup process for Azure Discovery
We will expose here one strategy which is to hide our Elasticsearch cluster from outside.
@ -153,7 +157,7 @@ other. That means that with this mode, you can use elasticsearch unicast
discovery to build a cluster, using the Azure API to retrieve information
about your nodes.
[[discovery-azure-long-prerequisites]]
[[discovery-azure-classic-long-prerequisites]]
===== Prerequisites
Before starting, you need to have:
@ -243,7 +247,7 @@ azure account download
azure account import /tmp/azure.publishsettings
----
[[discovery-azure-long-instance]]
[[discovery-azure-classic-long-instance]]
===== Creating your first instance
You need to have a storage account available. Check http://www.windowsazure.com/en-us/develop/net/how-to-guides/blob-storage/#create-account[Azure Blob Storage documentation]
@ -396,7 +400,7 @@ This command should give you a JSON result:
}
----
[[discovery-azure-long-plugin]]
[[discovery-azure-classic-long-plugin]]
===== Install elasticsearch cloud azure plugin
[source,sh]
@ -405,7 +409,7 @@ This command should give you a JSON result:
sudo service elasticsearch stop
# Install the plugin
sudo /usr/share/elasticsearch/bin/elasticsearch-plugin install discovery-azure
sudo /usr/share/elasticsearch/bin/elasticsearch-plugin install discovery-azure-classic
# Configure it
sudo vi /etc/elasticsearch/elasticsearch.yml
@ -441,7 +445,7 @@ sudo service elasticsearch start
If anything goes wrong, check your logs in `/var/log/elasticsearch`.
[[discovery-azure-scale]]
[[discovery-azure-classic-scale]]
==== Scaling Out!
You need first to create an image of your previous machine.

View File

@ -13,9 +13,9 @@ The core discovery plugins are:
The EC2 discovery plugin uses the https://github.com/aws/aws-sdk-java[AWS API] for unicast discovery.
<<discovery-azure,Azure discovery>>::
<<discovery-azure-classic,Azure Classic discovery>>::
The Azure discovery plugin uses the Azure API for unicast discovery.
The Azure Classic discovery plugin uses the Azure Classic API for unicast discovery.
<<discovery-gce,GCE discovery>>::
@ -33,7 +33,7 @@ A number of discovery plugins have been contributed by our community:
include::discovery-ec2.asciidoc[]
include::discovery-azure.asciidoc[]
include::discovery-azure-classic.asciidoc[]
include::discovery-gce.asciidoc[]

View File

@ -24,7 +24,7 @@ The `cloud-aws` plugin has been split into two separate plugins:
The `cloud-azure` plugin has been split into two separate plugins:
* <<discovery-azure>> (`discovery-azure`)
* <<discovery-azure-classic>> (`discovery-azure-classic`)
* <<repository-azure>> (`repository-azure`)

View File

@ -6,14 +6,9 @@ The `plugins` command provides a view per node of running plugins. This informat
[source,sh]
------------------------------------------------------------------------------
% curl 'localhost:9200/_cat/plugins?v'
name component version type isolation url
Abraxas discovery-azure 2.1.0-SNAPSHOT j x
Abraxas lang-javascript 2.0.0-SNAPSHOT j x
Abraxas marvel NA j/s x /_plugin/marvel/
Abraxas lang-python 2.0.0-SNAPSHOT j x
Abraxas inquisitor NA s /_plugin/inquisitor/
Abraxas kopf 0.5.2 s /_plugin/kopf/
Abraxas segmentspy NA s /_plugin/segmentspy/
name component version description
Abraxas discovery-gce 5.0.0 The Google Compute Engine (GCE) Discovery plugin allows to use GCE API for the unicast discovery mechanism.
Abraxas lang-javascript 5.0.0 The JavaScript language plugin allows to have javascript as the language of scripts to execute.
-------------------------------------------------------------------------------
We can tell quickly how many plugins per node we have and which versions.

View File

@ -54,6 +54,10 @@ conflict if the document changes between the time when the snapshot was taken
and when the delete request is processed. When the versions match the document
is deleted.
NOTE: Since `internal` versioning does not support the value 0 as a valid
version number, documents with version equal to zero cannot be deleted using
`_delete_by_query` and will fail the request.
During the `_delete_by_query` execution, multiple search requests are sequentially
executed in order to find all the matching documents to delete. Every time a batch
of documents is found, a corresponding bulk request is executed to delete all

View File

@ -119,6 +119,14 @@ indexed and the new version number used. If the value provided is less
than or equal to the stored document's version number, a version
conflict will occur and the index operation will fail.
WARNING: External versioning supports the value 0 as a valid version number.
This allows the version to be in sync with an external versioning system
where version numbers start from zero instead of one. It has the side effect
that documents with version number equal to zero cannot neither be updated
using the <<docs-update-by-query,Update-By-Query API>> nor be deleted
using the <<docs-delete-by-query,Delete By Query API>> as long as their
version number is equal to zero.
A nice side effect is that there is no need to maintain strict ordering
of async indexing operations executed as a result of changes to a source
database, as long as version numbers from the source database are used.

View File

@ -46,6 +46,10 @@ conflict if the document changes between the time when the snapshot was taken
and when the index request is processed. When the versions match the document
is updated and the version number is incremented.
NOTE: Since `internal` versioning does not support the value 0 as a valid
version number, documents with version equal to zero cannot be updated using
`_update_by_query` and will fail the request.
All update and query failures cause the `_update_by_query` to abort and are
returned in the `failures` of the response. The updates that have been
performed still stick. In other words, the process is not rolled back, only

View File

@ -1,7 +1,7 @@
[[elasticsearch-reference]]
= Elasticsearch Reference
:version: 5.0.0-alpha3
:version: 5.0.0-alpha4
:major-version: 5.x
:branch: master
:jdk: 1.8.0_73

View File

@ -63,7 +63,7 @@ Proxy settings for both plugins have been renamed:
Cloud Azure plugin has been split in three plugins:
* {plugins}/discovery-azure.html[Discovery Azure plugin]
* {plugins}/discovery-azure-classic.html[Discovery Azure plugin]
* {plugins}/repository-azure.html[Repository Azure plugin]
* {plugins}/store-smb.html[Store SMB plugin]

View File

@ -1,5 +1,5 @@
[[modules-discovery-azure]]
=== Azure Discovery
[[modules-discovery-azure-classic]]
=== Azure Classic Discovery
Azure discovery allows to use the Azure APIs to perform automatic discovery (similar to multicast).
It is available as a plugin. See {plugins}/discovery-azure.html[discovery-azure] for more information.
Azure classic discovery allows to use the Azure Classic APIs to perform automatic discovery (similar to multicast).
It is available as a plugin. See {plugins}/discovery-azure-classic.html[discovery-azure-classic] for more information.

View File

@ -83,9 +83,16 @@ First, each document is scored by the defined functions. The parameter
`max`:: maximum score is used
`min`:: minimum score is used
Because scores can be on different scales (for example, between 0 and 1 for decay functions but arbitrary for `field_value_factor`) and also because sometimes a different impact of functions on the score is desirable, the score of each function can be adjusted with a user defined `weight` (). The `weight` can be defined per function in the `functions` array (example above) and is multiplied with the score computed by the respective function.
Because scores can be on different scales (for example, between 0 and 1 for decay functions but arbitrary for `field_value_factor`) and also
because sometimes a different impact of functions on the score is desirable, the score of each function can be adjusted with a user defined
`weight`. The `weight` can be defined per function in the `functions` array (example above) and is multiplied with the score computed by
the respective function.
If weight is given without any other function declaration, `weight` acts as a function that simply returns the `weight`.
In case `score_mode` is set to `avg` the individual scores will be combined by a **weighted** average.
For example, if two functions return score 1 and 2 and their respective weights are 3 and 4, then their scores will be combined as
`(1*3+2*4)/(3+4)` and **not** `(1*3+2*4)/2`.
The new score can be restricted to not exceed a certain limit by setting
the `max_boost` parameter. The default for `max_boost` is FLT_MAX.

View File

@ -699,7 +699,9 @@ The meaning of the stats are as follows:
This is not currently used and will always report `0`. Currently aggregation profiling only times the shard level parts of the aggregation execution. Timing of the reduce phase will be added later.
=== Performance Notes
=== Profiling Considerations
==== Performance Notes
Like any profiler, the Profile API introduces a non-negligible overhead to search execution. The act of instrumenting
low-level method calls such as `collect`, `advance` and `next_doc` can be fairly expensive, since these methods are called
@ -710,7 +712,7 @@ There are also cases where special Lucene optimizations are disabled, since they
could cause some queries to report larger relative times than their non-profiled counterparts, but in general should
not have a drastic effect compared to other components in the profiled query.
=== Limitations
==== Limitations
- Profiling statistics are currently not available for suggestions, highlighting, `dfs_query_then_fetch`
- Profiling of the reduce phase of aggregation is currently not available

View File

@ -26,7 +26,7 @@ setting, as follows:
[source,sh]
-------------------------------
./bin/elasticsearch -Ees.path.conf=/path/to/my/config/
./bin/elasticsearch -Epath.conf=/path/to/my/config/
-------------------------------
[float]

View File

@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.Map;
@ -53,7 +53,7 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
protected abstract String process(String value);
static abstract class Factory<T extends AbstractStringProcessor> extends AbstractProcessorFactory<T> {
static abstract class Factory implements Processor.Factory {
protected final String processorType;
protected Factory(String processorType) {
@ -61,11 +61,11 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
}
@Override
public T doCreate(String processorTag, Map<String, Object> config) throws Exception {
public AbstractStringProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field");
return newProcessor(processorTag, field);
}
protected abstract T newProcessor(String processorTag, String field);
protected abstract AbstractStringProcessor newProcessor(String processorTag, String field);
}
}

View File

@ -20,9 +20,10 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
@ -64,7 +65,7 @@ public final class AppendProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<AppendProcessor> {
public static final class Factory implements Processor.Factory {
private final TemplateService templateService;
@ -73,7 +74,7 @@ public final class AppendProcessor extends AbstractProcessor {
}
@Override
public AppendProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
public AppendProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));

View File

@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.List;
@ -160,9 +160,9 @@ public final class ConvertProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<ConvertProcessor> {
public static final class Factory implements Processor.Factory {
@Override
public ConvertProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
public ConvertProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", field);

View File

@ -21,9 +21,9 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@ -120,10 +120,10 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
return dateFormats;
}
public static final class Factory extends AbstractProcessorFactory<DateIndexNameProcessor> {
public static final class Factory implements Processor.Factory {
@Override
protected DateIndexNameProcessor doCreate(String tag, Map<String, Object> config) throws Exception {
public DateIndexNameProcessor create(String tag, Map<String, Object> config) throws Exception {
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "locale");
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);

View File

@ -21,9 +21,9 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
@ -108,10 +108,10 @@ public final class DateProcessor extends AbstractProcessor {
return formats;
}
public static final class Factory extends AbstractProcessorFactory<DateProcessor> {
public static final class Factory implements Processor.Factory {
@SuppressWarnings("unchecked")
public DateProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
public DateProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone");

View File

@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import java.util.Map;
@ -56,7 +56,7 @@ public final class FailProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<FailProcessor> {
public static final class Factory implements Processor.Factory {
private final TemplateService templateService;
@ -65,7 +65,7 @@ public final class FailProcessor extends AbstractProcessor {
}
@Override
public FailProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
public FailProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message");
return new FailProcessor(processorTag, templateService.compile(message));
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
@ -83,7 +82,7 @@ public final class ForEachProcessor extends AbstractProcessor {
return processors;
}
public static final class Factory extends AbstractProcessorFactory<ForEachProcessor> {
public static final class Factory implements Processor.Factory {
private final ProcessorsRegistry processorRegistry;
@ -92,7 +91,7 @@ public final class ForEachProcessor extends AbstractProcessor {
}
@Override
protected ForEachProcessor doCreate(String tag, Map<String, Object> config) throws Exception {
public ForEachProcessor create(String tag, Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
List<Map<String, Map<String, Object>>> processorConfigs = readList(TYPE, tag, config, "processors");
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry);

View File

@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.HashMap;
import java.util.List;
@ -114,7 +114,7 @@ public final class GrokProcessor extends AbstractProcessor {
return combinedPattern;
}
public final static class Factory extends AbstractProcessorFactory<GrokProcessor> {
public final static class Factory implements Processor.Factory {
private final Map<String, String> builtinPatterns;
@ -123,7 +123,7 @@ public final class GrokProcessor extends AbstractProcessor {
}
@Override
public GrokProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
public GrokProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
List<String> matchPatterns = ConfigurationUtils.readList(TYPE, processorTag, config, "patterns");
boolean traceMatch = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trace_match", false);

View File

@ -20,8 +20,8 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.Map;
import java.util.regex.Matcher;
@ -78,9 +78,9 @@ public final class GsubProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<GsubProcessor> {
public static final class Factory implements Processor.Factory {
@Override
public GsubProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
public GsubProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, processorTag, config, "field");
String pattern = readStringProperty(TYPE, processorTag, config, "pattern");
String replacement = readStringProperty(TYPE, processorTag, config, "replacement");

View File

@ -60,7 +60,7 @@ public class IngestCommonPlugin extends Plugin {
nodeModule.registerProcessor(SortProcessor.TYPE, (registry) -> new SortProcessor.Factory());
nodeModule.registerProcessor(GrokProcessor.TYPE, (registry) -> new GrokProcessor.Factory(builtinPatterns));
nodeModule.registerProcessor(ScriptProcessor.TYPE, (registry) ->
new ScriptProcessor.Factory(registry.getScriptService(), registry.getClusterService()));
new ScriptProcessor.Factory(registry.getScriptService()));
}
// Code for loading built-in grok patterns packaged with the jar file:

View File

@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.List;
import java.util.Map;
@ -70,9 +70,9 @@ public final class JoinProcessor extends AbstractProcessor {
return TYPE;
}
public final static class Factory extends AbstractProcessorFactory<JoinProcessor> {
public final static class Factory implements Processor.Factory {
@Override
public JoinProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
public JoinProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator");
return new JoinProcessor(processorTag, field, separator);

Some files were not shown because too many files have changed in this diff Show More