CouchDB River: Support couchdb filter query parameters, closes #389.

This commit is contained in:
kimchy 2010-09-30 20:47:05 +02:00
parent 06c7c4a9ac
commit 735ad0d43d
1 changed files with 26 additions and 5 deletions

View File

@ -36,12 +36,10 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.*; import org.elasticsearch.river.*;
import java.io.BufferedReader; import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLEncoder;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -61,6 +59,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
private final int couchPort; private final int couchPort;
private final String couchDb; private final String couchDb;
private final String couchFilter; private final String couchFilter;
private final String couchFilterParamsUrl;
private final String indexName; private final String indexName;
private final String typeName; private final String typeName;
@ -84,11 +83,26 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
couchPort = XContentMapValues.nodeIntegerValue(couchSettings.get("port"), 5984); couchPort = XContentMapValues.nodeIntegerValue(couchSettings.get("port"), 5984);
couchDb = XContentMapValues.nodeStringValue(couchSettings.get("db"), riverName.name()); couchDb = XContentMapValues.nodeStringValue(couchSettings.get("db"), riverName.name());
couchFilter = XContentMapValues.nodeStringValue(couchSettings.get("filter"), null); couchFilter = XContentMapValues.nodeStringValue(couchSettings.get("filter"), null);
if (couchSettings.containsKey("filter_params")) {
Map<String, Object> filterParams = (Map<String, Object>) couchSettings.get("filter_params");
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, Object> entry : filterParams.entrySet()) {
try {
sb.append("&").append(URLEncoder.encode(entry.getKey(), "UTF-8")).append(URLEncoder.encode(entry.getValue().toString(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
// should not happen...
}
}
couchFilterParamsUrl = sb.toString();
} else {
couchFilterParamsUrl = null;
}
} else { } else {
couchHost = "localhost"; couchHost = "localhost";
couchPort = 5984; couchPort = 5984;
couchDb = "db"; couchDb = "db";
couchFilter = null; couchFilter = null;
couchFilterParamsUrl = null;
} }
if (settings.settings().containsKey("index")) { if (settings.settings().containsKey("index")) {
@ -261,7 +275,14 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
String file = "/" + couchDb + "/_changes?feed=continuous&include_docs=true&heartbeat=10000"; String file = "/" + couchDb + "/_changes?feed=continuous&include_docs=true&heartbeat=10000";
if (couchFilter != null) { if (couchFilter != null) {
file = file + "&filter=" + couchFilter; try {
file = file + "&filter=" + URLEncoder.encode(couchFilter, "UTF-8");
} catch (UnsupportedEncodingException e) {
// should not happen!
}
if (couchFilterParamsUrl != null) {
file = file + couchFilterParamsUrl;
}
} }
if (lastSeq != null) { if (lastSeq != null) {
file = file + "&since=" + lastSeq; file = file + "&since=" + lastSeq;