From ed9d9aa358229ab67f89491ce2615bc1835afeb8 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 14 Oct 2010 17:57:52 +0200 Subject: [PATCH] CouchDB River: Allow to define a javascript that can munge the changes stream, closes #431. --- .../river/couchdb/CouchdbRiver.java | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java index 5aeed466168..6066f28f9be 100644 --- a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java +++ b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java @@ -26,6 +26,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.action.bulk.BulkRequestBuilder; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.Base64; +import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -36,6 +37,8 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.river.*; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.ScriptService; import java.io.*; import java.net.HttpURLConnection; @@ -68,13 +71,16 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { private final int bulkSize; private final TimeValue bulkTimeout; + private final ExecutableScript script; + private final Map scriptParams = Maps.newHashMap(); + private volatile Thread slurperThread; private volatile Thread indexerThread; private volatile boolean closed; private final TransferQueue stream = new LinkedTransferQueue(); - @Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client) { + @Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client, ScriptService scriptService) { super(riverName, settings); this.riverIndexName = riverIndexName; this.client = client; @@ -106,6 +112,12 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { } else { basicAuth = null; } + + if (couchSettings.containsKey("script")) { + script = scriptService.executable("js", couchSettings.get("script").toString(), Maps.newHashMap()); + } else { + script = null; + } } else { couchHost = "localhost"; couchPort = 5984; @@ -113,6 +125,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { couchFilter = null; couchFilterParamsUrl = null; basicAuth = null; + script = null; } if (settings.settings().containsKey("index")) { @@ -166,19 +179,19 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { } private String processLine(String s, BulkRequestBuilder bulk) { - Map map = null; + Map ctx; try { - map = XContentFactory.xContent(XContentType.JSON).createParser(s).mapAndClose(); + ctx = XContentFactory.xContent(XContentType.JSON).createParser(s).mapAndClose(); } catch (IOException e) { logger.warn("failed to parse {}", e, s); return null; } - if (map.containsKey("error")) { + if (ctx.containsKey("error")) { logger.warn("received error {}", s); return null; } - String seq = map.get("seq").toString(); - String id = map.get("id").toString(); + String seq = ctx.get("seq").toString(); + String id = ctx.get("id").toString(); // Ignore design documents if (id.startsWith("_design/")) { @@ -186,10 +199,22 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { return seq; } - if (map.containsKey("deleted") && map.get("deleted").equals(Boolean.TRUE)) { + if (script != null) { + scriptParams.put("ctx", ctx); + try { + script.run(scriptParams); + } catch (Exception e) { + logger.warn("failed to script process {}, ignoring", e, ctx); + return seq; + } + } + + if (ctx.containsKey("ignore") && ctx.get("ignore").equals(Boolean.TRUE)) { + // ignore dock + } else if (ctx.containsKey("deleted") && ctx.get("deleted").equals(Boolean.TRUE)) { bulk.add(deleteRequest(indexName).type(typeName).id(id)); - } else if (map.containsKey("doc")) { - Map doc = (Map) map.get("doc"); + } else if (ctx.containsKey("doc")) { + Map doc = (Map) ctx.get("doc"); bulk.add(indexRequest(indexName).type(typeName).id(id).source(doc)); } else { logger.warn("ignoring unknown change {}", s);