diff --git a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java index c2856f69e24..6c8753f2df1 100644 --- a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java +++ b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java @@ -208,7 +208,8 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { if (lastSeq != null) { try { - bulk.add(indexRequest(riverIndexName).type(riverName.name()).id("_seq").source(jsonBuilder().startObject().field("last_seq", lastSeq).endObject())); + bulk.add(indexRequest(riverIndexName).type(riverName.name()).id("_seq") + .source(jsonBuilder().startObject().startObject("couchdb").field("last_seq", lastSeq).endObject().endObject())); } catch (IOException e) { logger.warn("failed to add last_seq entry to bulk indexing"); } @@ -241,7 +242,10 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet(); GetResponse lastSeqGetResponse = client.prepareGet(riverIndexName, riverName().name(), "_seq").execute().actionGet(); if (lastSeqGetResponse.exists()) { - lastSeq = lastSeqGetResponse.sourceAsMap().get("last_seq").toString(); + Map couchdbState = (Map) lastSeqGetResponse.sourceAsMap().get("couchdb"); + if (couchdbState != null) { + lastSeq = couchdbState.get("last_seq").toString(); + } } } catch (Exception e) { logger.warn("failed to get last_seq, throttling....", e); diff --git a/plugins/river/couchdb/src/test/java/org/elasticsearch/river/couchdb/CouchdbRiverTest.java b/plugins/river/couchdb/src/test/java/org/elasticsearch/river/couchdb/CouchdbRiverTest.java index d9c7c532716..02fab0c5fff 100644 --- a/plugins/river/couchdb/src/test/java/org/elasticsearch/river/couchdb/CouchdbRiverTest.java +++ b/plugins/river/couchdb/src/test/java/org/elasticsearch/river/couchdb/CouchdbRiverTest.java @@ -31,7 +31,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.*; public class CouchdbRiverTest { public static void main(String[] args) throws Exception { - Node node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put("gateway.type", "none")).node(); + Node node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put("gateway.type", "local")).node(); + Thread.sleep(1000); node.client().prepareIndex("_river", "db", "_meta").setSource(jsonBuilder().startObject().field("type", "couchdb").endObject()).execute().actionGet(); Thread.sleep(1000000);