From eef4ec2f5c406bf00d657d30745329852c6233b0 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 9 Aug 2011 15:53:13 +0300 Subject: [PATCH] Allow setting _index and _type with CouchDB river, closes #1219. --- .../river/couchdb/CouchdbRiver.java | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java index d5e305cc43c..bd7dc4e7a1d 100644 --- a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java +++ b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java @@ -37,11 +37,19 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.river.*; +import org.elasticsearch.river.AbstractRiverComponent; +import org.elasticsearch.river.River; +import org.elasticsearch.river.RiverIndexName; +import org.elasticsearch.river.RiverName; +import org.elasticsearch.river.RiverSettings; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; -import java.io.*; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; @@ -214,22 +222,46 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { if (ctx.containsKey("ignore") && ctx.get("ignore").equals(Boolean.TRUE)) { // ignore dock } else if (ctx.containsKey("deleted") && ctx.get("deleted").equals(Boolean.TRUE)) { + String index = extractIndex(ctx); + String type = extractType(ctx); if (logger.isTraceEnabled()) { - logger.trace("processing [delete]: [{}]/[{}]/[{}]", indexName, typeName, id); + logger.trace("processing [delete]: [{}]/[{}]/[{}]", index, type, id); } - bulk.add(deleteRequest(indexName).type(typeName).id(id)); + bulk.add(deleteRequest(index).type(type).id(id).routing(extractRouting(ctx))); } else if (ctx.containsKey("doc")) { + String index = extractIndex(ctx); + String type = extractType(ctx); Map doc = (Map) ctx.get("doc"); if (logger.isTraceEnabled()) { - logger.trace("processing [index ]: [{}]/[{}]/[{}], source {}", indexName, typeName, id, doc); + logger.trace("processing [index ]: [{}]/[{}]/[{}], source {}", index, type, id, doc); } - bulk.add(indexRequest(indexName).type(typeName).id(id).source(doc)); + bulk.add(indexRequest(index).type(type).id(id).source(doc).routing(extractRouting(ctx))); } else { logger.warn("ignoring unknown change {}", s); } return seq; } + private String extractRouting(Map ctx) { + return (String) ctx.get("_routing"); + } + + private String extractType(Map ctx) { + String type = (String) ctx.get("_type"); + if (type == null) { + type = typeName; + } + return type; + } + + private String extractIndex(Map ctx) { + String index = (String) ctx.get("_index"); + if (index == null) { + index = indexName; + } + return index; + } + private class Indexer implements Runnable { @Override public void run() { while (true) {