SOLR-5374: missing returns in user versioning processor

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1540336 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2013-11-09 15:56:29 +00:00
parent 9fea8e4a27
commit 2e8469408a
2 changed files with 79 additions and 91 deletions

View File

@ -254,7 +254,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
assert null != newUserVersion;
oldSolrVersion = -1;
// log.info("!!!!!!!!! isVersionNewEnough being called for " + indexedDocId.utf8ToString() + " newVersion=" + newUserVersion);
newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
Object oldUserVersion = null;
SolrInputDocument oldDoc = null;
@ -301,6 +301,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId);
if (null == oldDoc) {
// log.info("VERSION no doc found, returning true");
return true;
}
}
@ -318,6 +319,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
}
// log.info("VERSION old=" + oldUserVersion + " new=" +newUserVersion );
if ( null == oldUserVersion) {
// could happen if they turn this feature on after building an index
@ -336,12 +338,17 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
try {
if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
// log.info("VERSION returning true (proceed with update)" );
return true;
}
if (ignoreOldUpdates) {
if (log.isDebugEnabled()) {
log.debug("Dropping update since user version is not high enough: " + newUserVersion + "; old user version=" + oldUserVersion);
}
// log.info("VERSION returning false (dropping update)" );
return false;
} else {
// log.info("VERSION will throw conflict" );
throw new SolrException(CONFLICT,
"user version is not high enough: " + newUserVersion);
}
@ -355,81 +362,6 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
}
private boolean isVersionNewEnoughStoredOnly(BytesRef indexedDocId,
Object newUserVersion) throws IOException {
assert null != indexedDocId;
assert null != newUserVersion;
oldSolrVersion = -1;
// :TODO: would be nice if a full RTG was not always needed here, ideas...
// - first check fieldCache/docVals - if a versionField exists
// in index that is already greater then this cmd, fail fast
// (no need to check updateLog, new version already too low)
// - first check if docId is in the updateLog w/o doing the full get, if
// it's not then check fieldCache/docVals
// - track versionField externally from updateLog (or as a special case
// that can be looked up by itself - similar to how _version_ is dealt with)
//
// Depending on if/when/how this is changed, what we assert about
// versionField on init will need updated.
newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
Object oldUserVersion = null;
SolrInputDocument oldDoc =
RealTimeGetComponent.getInputDocument(core, indexedDocId);
if (null == oldDoc) {
return true;
}
oldUserVersion = oldDoc.getFieldValue(versionFieldName);
if ( null == oldUserVersion) {
// could happen if they turn this feature on after building an index
// w/o the versionField
throw new SolrException(SERVER_ERROR,
"Doc exists in index, but has null versionField: "
+ versionFieldName);
}
// Make the FieldType resolve any conversion we need.
oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
}
try {
if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
// since we're going to proceed with this update, we need to find the _version_
// so we can use optimistic concurrency.
Object o = oldDoc.getFieldValue(VersionInfo.VERSION_FIELD);
if (o == null) {
throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
}
oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
return true;
}
if (ignoreOldUpdates) {
return false;
} else {
throw new SolrException(CONFLICT,
"user version is not high enough: " + newUserVersion);
}
} catch (ClassCastException e) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
": " + e.getMessage(), e);
}
}
public boolean isLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
@ -439,12 +371,15 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
return false;
}
// if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
return distribProc.isLeader(cmd);
boolean x = distribProc.isLeader(cmd);
// log.info("VERSION: checking if we are leader:" + x);
return x;
}
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (!isLeader(cmd)) {
super.processAdd(cmd);
return;
}
final SolrInputDocument newDoc = cmd.getSolrInputDocument();
@ -471,6 +406,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
return;
} catch (SolrException e) {
if (e.code() == 409) {
// log.info ("##################### CONFLICT ADDING newDoc=" + newDoc + " newVersion=" + newVersion );
continue; // if a version conflict, retry
}
throw e; // rethrow
@ -512,6 +448,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
super.processAdd(newCmd);
return;
}

View File

@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.StrUtils;
import org.junit.BeforeClass;
@ -58,8 +59,6 @@ public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
super.fixShardCount = true; // we only want to test with exactly 2 slices.
/***
hash of a is 3c2569b2 high bits=0 shard=shard3
hash of b is 95de7e03 high bits=2 shard=shard1
@ -102,6 +101,7 @@ public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
waitForRecoveriesToFinish(false);
doTestDocVersions();
doTestHardFail();
testFinished = true;
} finally {
@ -111,6 +111,28 @@ public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
}
}
private void doTestHardFail() throws Exception {
log.info("### STARTING doTestHardFail");
// use a leader so we test both forwarding and non-forwarding logic
ss = shardToLeaderJetty.get(bucket1).client.solrClient;
// ss = cloudClient; CloudSolrServer doesn't currently support propagating error codes
doTestHardFail("p!doc1");
doTestHardFail("q!doc1");
doTestHardFail("r!doc1");
doTestHardFail("x!doc1");
}
private void doTestHardFail(String id) throws Exception {
vdelete(id, 5, "update.chain","external-version-failhard");
vadd(id, 10, "update.chain","external-version-failhard");
vadd(id ,15, "update.chain","external-version-failhard");
vaddFail(id ,11, 409, "update.chain","external-version-failhard");
vdeleteFail(id ,11, 409, "update.chain","external-version-failhard");
vdelete(id, 20, "update.chain","external-version-failhard");
}
private void doTestDocVersions() throws Exception {
log.info("### STARTING doTestDocVersions");
@ -159,14 +181,8 @@ public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
//
// now test with a non-smart client
//
List<CloudJettyRunner> runners = shardToJetty.get(bucket2);
CloudJettyRunner leader = shardToLeaderJetty.get(bucket2);
CloudJettyRunner replica = null;
for (CloudJettyRunner r : runners) {
if (r != leader) replica = r;
}
ss = replica.client.solrClient;
// use a leader so we test both forwarding and non-forwarding logic
ss = shardToLeaderJetty.get(bucket1).client.solrClient;
vadd("b!doc5", 10);
vadd("c!doc6", 11);
@ -222,18 +238,53 @@ public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
SolrServer ss;
void vdelete(String id, long version) throws Exception {
void vdelete(String id, long version, String... params) throws Exception {
UpdateRequest req = new UpdateRequest();
req.deleteById(id);
req.setParam("del_version", Long.toString(version));
for (int i=0; i<params.length; i+=2) {
req.setParam( params[i], params[i+1]);
}
ss.request(req);
// req.process(cloudClient);
}
void vadd(String id, long version) throws Exception {
index("id", id, vfield, version);
void vadd(String id, long version, String... params) throws Exception {
UpdateRequest req = new UpdateRequest();
req.add(sdoc("id", id, vfield, version));
for (int i=0; i<params.length; i+=2) {
req.setParam( params[i], params[i+1]);
}
ss.request(req);
}
void vaddFail(String id, long version, int errCode, String... params) throws Exception {
boolean failed = false;
try {
vadd(id, version, params);
} catch (SolrException e) {
failed = true;
assertEquals(errCode, e.code());
} catch (Exception e) {
log.error("ERROR", e);
}
assertTrue(failed);
}
void vdeleteFail(String id, long version, int errCode, String... params) throws Exception {
boolean failed = false;
try {
vdelete(id, version, params);
} catch (SolrException e) {
failed = true;
assertEquals(errCode, e.code());
} catch (Exception e) {
log.error("ERROR", e);
}
assertTrue(failed);
}
void doQuery(String expectedDocs, String... queryParams) throws Exception {
List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);