From 735ad0d43d1a35aaae108868e25b84c3721418f7 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 30 Sep 2010 20:47:05 +0200 Subject: [PATCH] CouchDB River: Support couchdb filter query parameters, closes #389. --- .../river/couchdb/CouchdbRiver.java | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java index 71e552377c0..2773edc77ce 100644 --- a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java +++ b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java @@ -36,12 +36,10 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.river.*; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; import java.net.HttpURLConnection; import java.net.URL; +import java.net.URLEncoder; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -61,6 +59,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { private final int couchPort; private final String couchDb; private final String couchFilter; + private final String couchFilterParamsUrl; private final String indexName; private final String typeName; @@ -84,11 +83,26 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { couchPort = XContentMapValues.nodeIntegerValue(couchSettings.get("port"), 5984); couchDb = XContentMapValues.nodeStringValue(couchSettings.get("db"), riverName.name()); couchFilter = XContentMapValues.nodeStringValue(couchSettings.get("filter"), null); + if (couchSettings.containsKey("filter_params")) { + Map filterParams = (Map) couchSettings.get("filter_params"); + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : filterParams.entrySet()) { + try { + sb.append("&").append(URLEncoder.encode(entry.getKey(), "UTF-8")).append(URLEncoder.encode(entry.getValue().toString(), "UTF-8")); + } catch (UnsupportedEncodingException e) { + // should not happen... + } + } + couchFilterParamsUrl = sb.toString(); + } else { + couchFilterParamsUrl = null; + } } else { couchHost = "localhost"; couchPort = 5984; couchDb = "db"; couchFilter = null; + couchFilterParamsUrl = null; } if (settings.settings().containsKey("index")) { @@ -261,7 +275,14 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { String file = "/" + couchDb + "/_changes?feed=continuous&include_docs=true&heartbeat=10000"; if (couchFilter != null) { - file = file + "&filter=" + couchFilter; + try { + file = file + "&filter=" + URLEncoder.encode(couchFilter, "UTF-8"); + } catch (UnsupportedEncodingException e) { + // should not happen! + } + if (couchFilterParamsUrl != null) { + file = file + couchFilterParamsUrl; + } } if (lastSeq != null) { file = file + "&since=" + lastSeq;