SOLR-6787, SOLR-6801 use realtime get to verify that the versions do not collide

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1653453 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2015-01-21 07:17:08 +00:00
parent c96c074527
commit 8932af3b4b
2 changed files with 28 additions and 5 deletions

View File

@ -23,6 +23,7 @@ import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -37,6 +38,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.UpdateParams;
@ -59,6 +61,7 @@ import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
public class BlobHandler extends RequestHandlerBase implements PluginInfoInitialized{
@ -126,15 +129,17 @@ public class BlobHandler extends RequestHandlerBase implements PluginInfoInitia
}
version++;
String id = blobName+"/"+version;
log.info(MessageFormat.format("New blob inserting {0} ,size {1}, md5 {2}",id, payload.limit(),md5));
indexMap(req, rsp, makeMap(
Map<String, Object> doc = makeMap(
"id", id,
"md5", md5,
"blobName", blobName,
"version", version,
"timestamp", new Date(),
"size", payload.limit(),
"blob", payload));
"blob", payload);
verifyWithRealtimeGet(blobName, version, req, doc);
log.info(MessageFormat.format("New blob inserting {0} ,size {1}, md5 {2}",doc.get("id"), payload.limit(),md5));
indexMap(req, rsp, doc);
log.info(" Successfully Added and committed a blob with id {} and size {} ",id, payload.limit());
break;
@ -202,6 +207,25 @@ public class BlobHandler extends RequestHandlerBase implements PluginInfoInitia
}
}
private void verifyWithRealtimeGet(String blobName, long version, SolrQueryRequest req, Map<String, Object> doc) {
for(;;) {
SolrQueryResponse response = new SolrQueryResponse();
String id = blobName + "/" + version;
req.forward("/get", new MapSolrParams(singletonMap("id", id)), response);
if(response.getValues().get("doc") == null) {
//ensure that the version does not exist
return;
} else {
log.info("id {} already exists trying next ",id);
version++;
doc.put("version", version);
id = blobName + "/" + version;
doc.put("id", id);
}
}
}
public static void indexMap(SolrQueryRequest req, SolrQueryResponse rsp, Map<String, Object> doc) throws IOException {
SolrInputDocument solrDoc = new SolrInputDocument();
for (Map.Entry<String, Object> e : doc.entrySet()) solrDoc.addField(e.getKey(),e.getValue());

View File

@ -177,9 +177,8 @@ public class TestBlobHandler extends AbstractFullDistribZkTestBase {
HttpPost httpPost = null;
HttpEntity entity;
String response = null;
Replica leader = cloudClient.getZkStateReader().getClusterState().getCollection(".system").getActiveSlices().iterator().next().getLeader();
try {
httpPost = new HttpPost(leader.getStr(ZkStateReader.BASE_URL_PROP) +"/.system/blob/test");
httpPost = new HttpPost(baseUrl+"/.system/blob/test");
httpPost.setHeader("Content-Type","application/octet-stream");
httpPost.setEntity(new ByteArrayEntity(bytarr.array(), bytarr.arrayOffset(), bytarr.limit()));
entity = cloudClient.getLbClient().getHttpClient().execute(httpPost).getEntity();