CouchDB River: Add throttling when indexing does not keep up with fetching _changes, closes #1269.

This commit is contained in:
Shay Banon 2011-08-22 22:18:04 +03:00
parent bb9bcf6a1e
commit 265b9f0369

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.io.Closeables;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.util.concurrent.jsr166y.TransferQueue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -54,6 +53,8 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.*; import static org.elasticsearch.client.Requests.*;
@ -79,6 +80,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
private final String typeName; private final String typeName;
private final int bulkSize; private final int bulkSize;
private final TimeValue bulkTimeout; private final TimeValue bulkTimeout;
private final int throttleSize;
private final ExecutableScript script; private final ExecutableScript script;
@ -86,7 +88,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
private volatile Thread indexerThread; private volatile Thread indexerThread;
private volatile boolean closed; private volatile boolean closed;
private final TransferQueue<String> stream = new LinkedTransferQueue<String>(); private final BlockingQueue<String> stream;
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
@Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client, ScriptService scriptService) { @Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client, ScriptService scriptService) {
@ -147,11 +149,18 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
} else { } else {
bulkTimeout = TimeValue.timeValueMillis(10); bulkTimeout = TimeValue.timeValueMillis(10);
} }
throttleSize = XContentMapValues.nodeIntegerValue(indexSettings.get("throttle_size"), bulkSize * 5);
} else { } else {
indexName = couchDb; indexName = couchDb;
typeName = couchDb; typeName = couchDb;
bulkSize = 100; bulkSize = 100;
bulkTimeout = TimeValue.timeValueMillis(10); bulkTimeout = TimeValue.timeValueMillis(10);
throttleSize = bulkSize * 5;
}
if (throttleSize == -1) {
stream = new LinkedTransferQueue<String>();
} else {
stream = new LinkedBlockingQueue<String>(throttleSize);
} }
} }
@ -405,7 +414,8 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[couchdb] {}", line); logger.trace("[couchdb] {}", line);
} }
stream.add(line); // we put here, so we block if there is no space to add
stream.put(line);
} }
} catch (Exception e) { } catch (Exception e) {
Closeables.closeQuietly(is); Closeables.closeQuietly(is);