From ccb3d217816df4f1331d70b2e737c6c01bc9c46a Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 4 Sep 2014 20:10:18 +0200 Subject: [PATCH] Bulk UDP: Removal. This feature is rarely used. Removing it will help reduce the moving parts of Elasticsearch and focus on the core. Close #7595 --- docs/reference/docs.asciidoc | 3 - docs/reference/docs/bulk-udp.asciidoc | 57 ----- .../elasticsearch/bulk/udp/BulkUdpModule.java | 32 --- .../bulk/udp/BulkUdpService.java | 221 ------------------ .../node/internal/InternalNode.java | 7 - 5 files changed, 320 deletions(-) delete mode 100644 docs/reference/docs/bulk-udp.asciidoc delete mode 100644 src/main/java/org/elasticsearch/bulk/udp/BulkUdpModule.java delete mode 100644 src/main/java/org/elasticsearch/bulk/udp/BulkUdpService.java diff --git a/docs/reference/docs.asciidoc b/docs/reference/docs.asciidoc index 469bc196c29..0686de866ad 100644 --- a/docs/reference/docs.asciidoc +++ b/docs/reference/docs.asciidoc @@ -15,7 +15,6 @@ This section describes the following CRUD APIs: .Multi-document APIs * <> * <> -* <> * <> NOTE: All CRUD APIs are single-index APIs. The `index` parameter accepts a single @@ -37,8 +36,6 @@ include::docs/bulk.asciidoc[] include::docs/delete-by-query.asciidoc[] -include::docs/bulk-udp.asciidoc[] - include::docs/termvectors.asciidoc[] include::docs/multi-termvectors.asciidoc[] diff --git a/docs/reference/docs/bulk-udp.asciidoc b/docs/reference/docs/bulk-udp.asciidoc deleted file mode 100644 index 74565a396a0..00000000000 --- a/docs/reference/docs/bulk-udp.asciidoc +++ /dev/null @@ -1,57 +0,0 @@ -[[docs-bulk-udp]] -== Bulk UDP API - -A Bulk UDP service is a service listening over UDP for bulk format -requests. The idea is to provide a low latency UDP service that allows -to easily index data that is not of critical nature. - -The Bulk UDP service is disabled by default, but can be enabled by -setting `bulk.udp.enabled` to `true`. - -The bulk UDP service performs internal bulk aggregation of the data and -then flushes it based on several parameters: - -`bulk.udp.bulk_actions`:: - The number of actions to flush a bulk after, - defaults to `1000`. - -`bulk.udp.bulk_size`:: - The size of the current bulk request to flush - the request once exceeded, defaults to `5mb`. - -`bulk.udp.flush_interval`:: - An interval after which the current - request is flushed, regardless of the above limits. Defaults to `5s`. -`bulk.udp.concurrent_requests`:: - The number on max in flight bulk - requests allowed. Defaults to `4`. - -The allowed network settings are: - -`bulk.udp.host`:: - The host to bind to, defaults to `network.host` - which defaults to any. - -`bulk.udp.port`:: - The port to use, defaults to `9700-9800`. - -`bulk.udp.receive_buffer_size`:: - The receive buffer size, defaults to `10mb`. - -Here is an example of how it can be used: - -[source,js] --------------------------------------------------- -> cat bulk.txt -{ "index" : { "_index" : "test", "_type" : "type1" } } -{ "field1" : "value1" } -{ "index" : { "_index" : "test", "_type" : "type1" } } -{ "field1" : "value1" } --------------------------------------------------- - -[source,js] --------------------------------------------------- -> cat bulk.txt | nc -w 0 -u localhost 9700 --------------------------------------------------- - - diff --git a/src/main/java/org/elasticsearch/bulk/udp/BulkUdpModule.java b/src/main/java/org/elasticsearch/bulk/udp/BulkUdpModule.java deleted file mode 100644 index d195b56beee..00000000000 --- a/src/main/java/org/elasticsearch/bulk/udp/BulkUdpModule.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.bulk.udp; - -import org.elasticsearch.common.inject.AbstractModule; - -/** - */ -public class BulkUdpModule extends AbstractModule { - - @Override - protected void configure() { - bind(BulkUdpService.class).asEagerSingleton(); - } -} diff --git a/src/main/java/org/elasticsearch/bulk/udp/BulkUdpService.java b/src/main/java/org/elasticsearch/bulk/udp/BulkUdpService.java deleted file mode 100644 index d211932d4a4..00000000000 --- a/src/main/java/org/elasticsearch/bulk/udp/BulkUdpService.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.bulk.udp; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.bytes.ChannelBufferBytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.PortsRange; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.jboss.netty.bootstrap.ConnectionlessBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; - -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; - -/** - */ -public class BulkUdpService extends AbstractLifecycleComponent { - - private final Client client; - private final NetworkService networkService; - - private final boolean enabled; - - final String host; - final String port; - - final ByteSizeValue receiveBufferSize; - final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; - final int bulkActions; - final ByteSizeValue bulkSize; - final TimeValue flushInterval; - final int concurrentRequests; - - private BulkProcessor bulkProcessor; - private ConnectionlessBootstrap bootstrap; - private Channel channel; - - @Inject - public BulkUdpService(Settings settings, Client client, NetworkService networkService) { - super(settings); - this.client = client; - this.networkService = networkService; - - this.host = componentSettings.get("host"); - this.port = componentSettings.get("port", "9700-9800"); - - this.bulkActions = componentSettings.getAsInt("bulk_actions", 1000); - this.bulkSize = componentSettings.getAsBytesSize("bulk_size", new ByteSizeValue(5, ByteSizeUnit.MB)); - this.flushInterval = componentSettings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5)); - this.concurrentRequests = componentSettings.getAsInt("concurrent_requests", 4); - - this.receiveBufferSize = componentSettings.getAsBytesSize("receive_buffer_size", new ByteSizeValue(10, ByteSizeUnit.MB)); - this.receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory(componentSettings.getAsBytesSize("receive_predictor_size", receiveBufferSize).bytesAsInt()); - - this.enabled = componentSettings.getAsBoolean("enabled", false); - - logger.debug("using enabled [{}], host [{}], port [{}], bulk_actions [{}], bulk_size [{}], flush_interval [{}], concurrent_requests [{}]", - enabled, host, port, bulkActions, bulkSize, flushInterval, concurrentRequests); - } - - @Override - protected void doStart() throws ElasticsearchException { - if (!enabled) { - return; - } - bulkProcessor = BulkProcessor.builder(client, new BulkListener()) - .setBulkActions(bulkActions) - .setBulkSize(bulkSize) - .setFlushInterval(flushInterval) - .setConcurrentRequests(concurrentRequests) - .build(); - - - bootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, "bulk_udp_worker")))); - - bootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt()); - bootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); - - // Enable broadcast - bootstrap.setOption("broadcast", "false"); - - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new Handler()); - } - }); - - - InetAddress hostAddressX; - try { - hostAddressX = networkService.resolveBindHostAddress(host); - } catch (IOException e) { - logger.warn("failed to resolve host {}", e, host); - return; - } - final InetAddress hostAddress = hostAddressX; - - PortsRange portsRange = new PortsRange(port); - final AtomicReference lastException = new AtomicReference<>(); - boolean success = portsRange.iterate(new PortsRange.PortCallback() { - @Override - public boolean onPortNumber(int portNumber) { - try { - channel = bootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - } - }); - if (!success) { - logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port); - return; - } - - logger.info("address {}", channel.getLocalAddress()); - } - - @Override - protected void doStop() throws ElasticsearchException { - if (!enabled) { - return; - } - if (channel != null) { - channel.close().awaitUninterruptibly(); - } - if (bootstrap != null) { - bootstrap.releaseExternalResources(); - } - bulkProcessor.close(); - } - - @Override - protected void doClose() throws ElasticsearchException { - } - - class Handler extends SimpleChannelUpstreamHandler { - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); - logger.trace("received message size [{}]", buffer.readableBytes()); - try { - bulkProcessor.add(new ChannelBufferBytesReference(buffer), false, null, null); - } catch (Exception e1) { - logger.warn("failed to execute bulk request", e1); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - if (e.getCause() instanceof BindException) { - // ignore, this happens when we retry binding to several ports, its fine if we fail... - return; - } - logger.warn("failure caught", e.getCause()); - } - } - - class BulkListener implements BulkProcessor.Listener { - - @Override - public void beforeBulk(long executionId, BulkRequest request) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] executing [{}]/[{}]", executionId, request.numberOfActions(), new ByteSizeValue(request.estimatedSizeInBytes())); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] executed [{}]/[{}], took [{}]", executionId, request.numberOfActions(), new ByteSizeValue(request.estimatedSizeInBytes()), response.getTook()); - } - if (response.hasFailures()) { - logger.warn("[{}] failed to execute bulk request: {}", executionId, response.buildFailureMessage()); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable e) { - logger.warn("[{}] failed to execute bulk request", e, executionId); - } - } -} diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 9dc25da6546..a6976515106 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -24,8 +24,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.bench.BenchmarkModule; -import org.elasticsearch.bulk.udp.BulkUdpModule; -import org.elasticsearch.bulk.udp.BulkUdpService; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecyclerModule; import org.elasticsearch.client.Client; @@ -183,7 +181,6 @@ public final class InternalNode implements Node { modules.add(new MonitorModule(settings)); modules.add(new GatewayModule(settings)); modules.add(new NodeClientModule()); - modules.add(new BulkUdpModule()); modules.add(new ShapeModule()); modules.add(new PercolatorModule()); modules.add(new ResourceWatcherModule()); @@ -251,7 +248,6 @@ public final class InternalNode implements Node { if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).start(); } - injector.getInstance(BulkUdpService.class).start(); injector.getInstance(ResourceWatcherService.class).start(); injector.getInstance(TribeService.class).start(); @@ -269,7 +265,6 @@ public final class InternalNode implements Node { logger.info("stopping ..."); injector.getInstance(TribeService.class).stop(); - injector.getInstance(BulkUdpService.class).stop(); injector.getInstance(ResourceWatcherService.class).stop(); if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).stop(); @@ -323,8 +318,6 @@ public final class InternalNode implements Node { StopWatch stopWatch = new StopWatch("node_close"); stopWatch.start("tribe"); injector.getInstance(TribeService.class).close(); - stopWatch.stop().start("bulk.udp"); - injector.getInstance(BulkUdpService.class).close(); stopWatch.stop().start("http"); if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).close();