mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
better state control, store under couchdb obj
This commit is contained in:
parent
ecaaeb5250
commit
3a5938b2d1
@ -208,7 +208,8 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
|
|||||||
|
|
||||||
if (lastSeq != null) {
|
if (lastSeq != null) {
|
||||||
try {
|
try {
|
||||||
bulk.add(indexRequest(riverIndexName).type(riverName.name()).id("_seq").source(jsonBuilder().startObject().field("last_seq", lastSeq).endObject()));
|
bulk.add(indexRequest(riverIndexName).type(riverName.name()).id("_seq")
|
||||||
|
.source(jsonBuilder().startObject().startObject("couchdb").field("last_seq", lastSeq).endObject().endObject()));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("failed to add last_seq entry to bulk indexing");
|
logger.warn("failed to add last_seq entry to bulk indexing");
|
||||||
}
|
}
|
||||||
@ -241,7 +242,10 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
|
|||||||
client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet();
|
client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet();
|
||||||
GetResponse lastSeqGetResponse = client.prepareGet(riverIndexName, riverName().name(), "_seq").execute().actionGet();
|
GetResponse lastSeqGetResponse = client.prepareGet(riverIndexName, riverName().name(), "_seq").execute().actionGet();
|
||||||
if (lastSeqGetResponse.exists()) {
|
if (lastSeqGetResponse.exists()) {
|
||||||
lastSeq = lastSeqGetResponse.sourceAsMap().get("last_seq").toString();
|
Map<String, Object> couchdbState = (Map<String, Object>) lastSeqGetResponse.sourceAsMap().get("couchdb");
|
||||||
|
if (couchdbState != null) {
|
||||||
|
lastSeq = couchdbState.get("last_seq").toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("failed to get last_seq, throttling....", e);
|
logger.warn("failed to get last_seq, throttling....", e);
|
||||||
|
@ -31,7 +31,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
|||||||
public class CouchdbRiverTest {
|
public class CouchdbRiverTest {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
Node node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put("gateway.type", "none")).node();
|
Node node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put("gateway.type", "local")).node();
|
||||||
|
Thread.sleep(1000);
|
||||||
node.client().prepareIndex("_river", "db", "_meta").setSource(jsonBuilder().startObject().field("type", "couchdb").endObject()).execute().actionGet();
|
node.client().prepareIndex("_river", "db", "_meta").setSource(jsonBuilder().startObject().field("type", "couchdb").endObject()).execute().actionGet();
|
||||||
|
|
||||||
Thread.sleep(1000000);
|
Thread.sleep(1000000);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user