diff --git a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java index 93b5a2be7ff..e722b6ff546 100644 --- a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java +++ b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.io.Closeables; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; -import org.elasticsearch.common.util.concurrent.jsr166y.TransferQueue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -54,6 +53,8 @@ import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import static org.elasticsearch.client.Requests.*; @@ -79,6 +80,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { private final String typeName; private final int bulkSize; private final TimeValue bulkTimeout; + private final int throttleSize; private final ExecutableScript script; @@ -86,7 +88,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { private volatile Thread indexerThread; private volatile boolean closed; - private final TransferQueue stream = new LinkedTransferQueue(); + private final BlockingQueue stream; @SuppressWarnings({"unchecked"}) @Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client, ScriptService scriptService) { @@ -147,11 +149,18 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { } else { bulkTimeout = TimeValue.timeValueMillis(10); } + throttleSize = XContentMapValues.nodeIntegerValue(indexSettings.get("throttle_size"), bulkSize * 5); } else { indexName = couchDb; typeName = couchDb; bulkSize = 100; bulkTimeout = TimeValue.timeValueMillis(10); + throttleSize = bulkSize * 5; + } + if (throttleSize == -1) { + stream = new LinkedTransferQueue(); + } else { + stream = new LinkedBlockingQueue(throttleSize); } } @@ -405,7 +414,8 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { if (logger.isTraceEnabled()) { logger.trace("[couchdb] {}", line); } - stream.add(line); + // we put here, so we block if there is no space to add + stream.put(line); } } catch (Exception e) { Closeables.closeQuietly(is);