Allow setting _index and _type with CouchDB river, closes #1219.

This commit is contained in:
Shay Banon 2011-08-09 15:53:13 +03:00
parent 7af84869fc
commit eef4ec2f5c

View File

@ -37,11 +37,19 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.*;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
@ -214,22 +222,46 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
if (ctx.containsKey("ignore") && ctx.get("ignore").equals(Boolean.TRUE)) {
// ignore dock
} else if (ctx.containsKey("deleted") && ctx.get("deleted").equals(Boolean.TRUE)) {
String index = extractIndex(ctx);
String type = extractType(ctx);
if (logger.isTraceEnabled()) {
logger.trace("processing [delete]: [{}]/[{}]/[{}]", indexName, typeName, id);
logger.trace("processing [delete]: [{}]/[{}]/[{}]", index, type, id);
}
bulk.add(deleteRequest(indexName).type(typeName).id(id));
bulk.add(deleteRequest(index).type(type).id(id).routing(extractRouting(ctx)));
} else if (ctx.containsKey("doc")) {
String index = extractIndex(ctx);
String type = extractType(ctx);
Map<String, Object> doc = (Map<String, Object>) ctx.get("doc");
if (logger.isTraceEnabled()) {
logger.trace("processing [index ]: [{}]/[{}]/[{}], source {}", indexName, typeName, id, doc);
logger.trace("processing [index ]: [{}]/[{}]/[{}], source {}", index, type, id, doc);
}
bulk.add(indexRequest(indexName).type(typeName).id(id).source(doc));
bulk.add(indexRequest(index).type(type).id(id).source(doc).routing(extractRouting(ctx)));
} else {
logger.warn("ignoring unknown change {}", s);
}
return seq;
}
private String extractRouting(Map<String, Object> ctx) {
return (String) ctx.get("_routing");
}
private String extractType(Map<String, Object> ctx) {
String type = (String) ctx.get("_type");
if (type == null) {
type = typeName;
}
return type;
}
private String extractIndex(Map<String, Object> ctx) {
String index = (String) ctx.get("_index");
if (index == null) {
index = indexName;
}
return index;
}
private class Indexer implements Runnable {
@Override public void run() {
while (true) {