BigCouch returns JSON array for sequence, closes #1478.

This commit is contained in:
Shay Banon 2011-11-18 13:35:28 +02:00
parent 6403a42e31
commit 8e9f01a0f7
1 changed files with 27 additions and 8 deletions

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.io.Closeables;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -52,6 +53,7 @@ import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -201,7 +203,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
}
@SuppressWarnings({"unchecked"})
private String processLine(String s, BulkRequestBuilder bulk) {
private Object processLine(String s, BulkRequestBuilder bulk) {
Map<String, Object> ctx;
try {
ctx = XContentFactory.xContent(XContentType.JSON).createParser(s).mapAndClose();
@ -213,7 +215,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
logger.warn("received error {}", s);
return null;
}
String seq = ctx.get("seq").toString();
Object seq = ctx.get("seq");
String id = ctx.get("id").toString();
// Ignore design documents
@ -310,8 +312,8 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
continue;
}
BulkRequestBuilder bulk = client.prepareBulk();
String lastSeq = null;
String lineSeq = processLine(s, bulk);
Object lastSeq = null;
Object lineSeq = processLine(s, bulk);
if (lineSeq != null) {
lastSeq = lineSeq;
}
@ -369,14 +371,14 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
return;
}
String lastSeq = null;
Object lastSeq = null;
try {
client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet();
GetResponse lastSeqGetResponse = client.prepareGet(riverIndexName, riverName().name(), "_seq").execute().actionGet();
if (lastSeqGetResponse.exists()) {
Map<String, Object> couchdbState = (Map<String, Object>) lastSeqGetResponse.sourceAsMap().get("couchdb");
if (couchdbState != null) {
lastSeq = couchdbState.get("last_seq").toString();
lastSeq = couchdbState.get("last_seq");
}
}
} catch (Exception e) {
@ -404,11 +406,28 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
}
if (lastSeq != null) {
String lastSeqAsString = null;
if (lastSeq instanceof List) {
// bigcouch uses array for the seq
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
for (Object value : ((List) lastSeq)) {
builder.value(value);
}
builder.endObject();
lastSeqAsString = builder.string();
} catch (Exception e) {
logger.error("failed to convert last_seq to a json string", e);
}
} else {
lastSeqAsString = lastSeq.toString();
}
try {
file = file + "&since=" + URLEncoder.encode(lastSeq, "UTF-8");
file = file + "&since=" + URLEncoder.encode(lastSeqAsString, "UTF-8");
} catch (UnsupportedEncodingException e) {
// should not happen, but in any case...
file = file + "&since=" + lastSeq;
file = file + "&since=" + lastSeqAsString;
}
}