SOLR-6180: Callers of ManagedIndexSchema mutators should hold the schemaUpdateLock.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1608646 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Steven Rowe 2014-07-08 03:20:54 +00:00
parent 474d846bb5
commit 954f3dfdc5
8 changed files with 207 additions and 82 deletions

View File

@ -157,6 +157,9 @@ Bug Fixes
* SOLR-6223: SearchComponents may throw NPE when using shards.tolerant and there is a failure
in the 'GET_FIELDS/GET_HIGHLIGHTS/GET_DEBUG' phase. (Tomás Fernández Löbbe via shalin)
* SOLR-6180: Callers of ManagedIndexSchema mutators should hold the schemaUpdateLock.
(Gregory Chanan via Steve Rowe)
Optimizations
---------------------

View File

@ -171,12 +171,14 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
boolean success = false;
while (!success) {
try {
IndexSchema newSchema = oldSchema.addCopyFields(fieldsToCopy);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields.");
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addCopyFields(fieldsToCopy);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields.");
}
}
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
log.debug("Schema changed while processing request, retrying");

View File

@ -195,12 +195,14 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl
}
}
firstAttempt = false;
IndexSchema newSchema = oldSchema.addFields(newFields, copyFields);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields.");
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addFields(newFields, copyFields);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields.");
}
}
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
log.debug("Schema changed while processing request, retrying");

View File

@ -166,12 +166,14 @@ public class FieldResource extends BaseFieldResource implements GETable, PUTable
while (!success) {
try {
SchemaField newField = oldSchema.newField(fieldName, fieldType, map);
IndexSchema newSchema = oldSchema.addField(newField, copyFieldNames);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add field.");
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addField(newField, copyFieldNames);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add field.");
}
}
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
log.debug("Schema changed while processing request, retrying");

View File

@ -1473,7 +1473,9 @@ public class IndexSchema {
}
/**
* Copies this schema, adds the given field to the copy, then persists the new schema.
* Copies this schema, adds the given field to the copy, then persists the
* new schema. Requires synchronizing on the object returned by
* {@link #getSchemaUpdateLock()}.
*
* @param newField the SchemaField to add
* @return a new IndexSchema based on this schema with newField added
@ -1486,7 +1488,9 @@ public class IndexSchema {
}
/**
* Copies this schema, adds the given field to the copy, then persists the new schema.
* Copies this schema, adds the given field to the copy, then persists the
* new schema. Requires synchronizing on the object returned by
* {@link #getSchemaUpdateLock()}.
*
* @param newField the SchemaField to add
* @param copyFieldNames 0 or more names of targets to copy this field to. The targets must already exist.
@ -1500,7 +1504,9 @@ public class IndexSchema {
}
/**
* Copies this schema, adds the given fields to the copy, then persists the new schema.
* Copies this schema, adds the given fields to the copy, then persists the
* new schema. Requires synchronizing on the object returned by
* {@link #getSchemaUpdateLock()}.
*
* @param newFields the SchemaFields to add
* @return a new IndexSchema based on this schema with newFields added
@ -1513,7 +1519,9 @@ public class IndexSchema {
}
/**
* Copies this schema, adds the given fields to the copy, then persists the new schema.
* Copies this schema, adds the given fields to the copy, then persists the
* new schema. Requires synchronizing on the object returned by
* {@link #getSchemaUpdateLock()}.
*
* @param newFields the SchemaFields to add
* @param copyFieldNames 0 or more names of targets to copy this field to. The target fields must already exist.
@ -1527,7 +1535,10 @@ public class IndexSchema {
}
/**
* Copies this schema and adds the new copy fields to the copy, then persists the new schema
* Copies this schema and adds the new copy fields to the copy, then
* persists the new schema. Requires synchronizing on the object returned by
* {@link #getSchemaUpdateLock()}.
*
* @param copyFields Key is the name of the source field name, value is a collection of target field names. Fields must exist.
* @return The new Schema with the copy fields added
*/
@ -1554,4 +1565,16 @@ public class IndexSchema {
log.error(msg);
throw new SolrException(ErrorCode.SERVER_ERROR, msg);
}
/**
* Returns the schema update lock that should be synchronzied on
* to update the schema. Only applicable to mutable schemas.
*
* @return the schema update lock object to synchronize on
*/
public Object getSchemaUpdateLock() {
String msg = "This IndexSchema is not mutable.";
log.error(msg);
throw new SolrException(ErrorCode.SERVER_ERROR, msg);
}
}

View File

@ -209,33 +209,30 @@ public final class ManagedIndexSchema extends IndexSchema {
if (copyFieldNames == null){
copyFieldNames = Collections.emptyMap();
}
// even though fields is volatile, we need to synchronize to avoid two addFields
// happening concurrently (and ending up missing one of them)
synchronized (getSchemaUpdateLock()) {
newSchema = shallowCopy(true);
newSchema = shallowCopy(true);
for (SchemaField newField : newFields) {
if (null != newSchema.getFieldOrNull(newField.getName())) {
String msg = "Field '" + newField.getName() + "' already exists.";
throw new FieldExistsException(ErrorCode.BAD_REQUEST, msg);
}
newSchema.fields.put(newField.getName(), newField);
for (SchemaField newField : newFields) {
if (null != newSchema.getFieldOrNull(newField.getName())) {
String msg = "Field '" + newField.getName() + "' already exists.";
throw new FieldExistsException(ErrorCode.BAD_REQUEST, msg);
}
newSchema.fields.put(newField.getName(), newField);
if (null != newField.getDefaultValue()) {
log.debug(newField.getName() + " contains default value: " + newField.getDefaultValue());
newSchema.fieldsWithDefaultValue.add(newField);
}
if (newField.isRequired()) {
log.debug("{} is required in this schema", newField.getName());
newSchema.requiredFields.add(newField);
}
Collection<String> copyFields = copyFieldNames.get(newField.getName());
if (copyFields != null) {
for (String copyField : copyFields) {
newSchema.registerCopyField(newField.getName(), copyField);
}
if (null != newField.getDefaultValue()) {
log.debug(newField.getName() + " contains default value: " + newField.getDefaultValue());
newSchema.fieldsWithDefaultValue.add(newField);
}
if (newField.isRequired()) {
log.debug("{} is required in this schema", newField.getName());
newSchema.requiredFields.add(newField);
}
Collection<String> copyFields = copyFieldNames.get(newField.getName());
if (copyFields != null) {
for (String copyField : copyFields) {
newSchema.registerCopyField(newField.getName(), copyField);
}
}
// Run the callbacks on SchemaAware now that everything else is done
for (SchemaAware aware : newSchema.schemaAware) {
aware.inform(newSchema);
@ -261,30 +258,26 @@ public final class ManagedIndexSchema extends IndexSchema {
ManagedIndexSchema newSchema = null;
if (isMutable) {
boolean success = false;
// even though fields is volatile, we need to synchronize to avoid two addCopyFields
// happening concurrently (and ending up missing one of them)
synchronized (getSchemaUpdateLock()) {
newSchema = shallowCopy(true);
for (Map.Entry<String, Collection<String>> entry : copyFields.entrySet()) {
//Key is the name of the field, values are the destinations
newSchema = shallowCopy(true);
for (Map.Entry<String, Collection<String>> entry : copyFields.entrySet()) {
//Key is the name of the field, values are the destinations
for (String destination : entry.getValue()) {
newSchema.registerCopyField(entry.getKey(), destination);
}
}
//TODO: move this common stuff out to shared methods
// Run the callbacks on SchemaAware now that everything else is done
for (SchemaAware aware : newSchema.schemaAware) {
aware.inform(newSchema);
}
newSchema.refreshAnalyzers();
success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists
if (success) {
log.debug("Added copy fields for {} sources", copyFields.size());
} else {
log.error("Failed to add copy fields for {} sources", copyFields.size());
for (String destination : entry.getValue()) {
newSchema.registerCopyField(entry.getKey(), destination);
}
}
//TODO: move this common stuff out to shared methods
// Run the callbacks on SchemaAware now that everything else is done
for (SchemaAware aware : newSchema.schemaAware) {
aware.inform(newSchema);
}
newSchema.refreshAnalyzers();
success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists
if (success) {
log.debug("Added copy fields for {} sources", copyFields.size());
} else {
log.error("Failed to add copy fields for {} sources", copyFields.size());
}
}
return newSchema;
}
@ -431,7 +424,8 @@ public final class ManagedIndexSchema extends IndexSchema {
return newSchema;
}
@Override
public Object getSchemaUpdateLock() {
return schemaUpdateLock;
}

View File

@ -315,14 +315,16 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso
log.debug(builder.toString());
}
try {
IndexSchema newSchema = oldSchema.addFields(newFields);
if (null != newSchema) {
cmd.getReq().getCore().setLatestSchema(newSchema);
cmd.getReq().updateSchemaToLatest();
log.debug("Successfully added field(s) to the schema.");
break; // success - exit from the retry loop
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to add fields.");
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addFields(newFields);
if (null != newSchema) {
cmd.getReq().getCore().setLatestSchema(newSchema);
cmd.getReq().updateSchemaToLatest();
log.debug("Successfully added field(s) to the schema.");
break; // success - exit from the retry loop
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to add fields.");
}
}
} catch(ManagedIndexSchema.FieldExistsException e) {
log.debug("At least one field to be added already exists in the schema - retrying.");

View File

@ -111,18 +111,19 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
verifySuccess(request, response);
}
private String[] getExpectedFieldResponses(int numAddFieldPuts, int numAddFieldPosts) {
private String[] getExpectedFieldResponses(int numAddFieldPuts, String putFieldName,
int numAddFieldPosts, String postFieldName) {
String[] expectedAddFields = new String[1 + numAddFieldPuts + numAddFieldPosts];
expectedAddFields[0] = SUCCESS_XPATH;
for (int i = 0; i < numAddFieldPuts; ++i) {
String newFieldName = "newfieldPut" + i;
String newFieldName = putFieldName + i;
expectedAddFields[1 + i]
= "/response/arr[@name='fields']/lst/str[@name='name'][.='" + newFieldName + "']";
}
for (int i = 0; i < numAddFieldPosts; ++i) {
String newFieldName = "newfieldPost" + i;
String newFieldName = postFieldName + i;
expectedAddFields[1 + numAddFieldPuts + i]
= "/response/arr[@name='fields']/lst/str[@name='name'][.='" + newFieldName + "']";
}
@ -148,6 +149,11 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
@Override
public void doTest() throws Exception {
setupHarnesses();
concurrentOperationsTest();
schemaLockTest();
}
private void concurrentOperationsTest() throws Exception {
// First, add a bunch of fields via PUT and POST, as well as copyFields,
// but do it fast enough and verify shards' schemas after all of them are added
@ -155,16 +161,19 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
int numAddFieldPuts = 0;
int numAddFieldPosts = 0;
List<CopyFieldInfo> copyFields = new ArrayList<>();
final String putFieldName = "newfieldPut";
final String postFieldName = "newfieldPost";
for (int i = 0; i <= numFields ; ++i) {
RestTestHarness publisher = restTestHarnesses.get(r.nextInt(restTestHarnesses.size()));
int type = random().nextInt(3);
if (type == 0) { // send an add field via PUT
addFieldPut(publisher, "newfieldPut" + numAddFieldPuts++);
addFieldPut(publisher, putFieldName + numAddFieldPuts++);
}
else if (type == 1) { // send an add field via POST
addFieldPost(publisher, "newfieldPost" + numAddFieldPosts++);
addFieldPost(publisher, postFieldName + numAddFieldPosts++);
}
else if (type == 2) { // send a copy field
String sourceField = null;
@ -196,7 +205,8 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
}
String[] expectedAddFields = getExpectedFieldResponses(numAddFieldPuts, numAddFieldPosts);
String[] expectedAddFields = getExpectedFieldResponses(numAddFieldPuts, putFieldName,
numAddFieldPosts, postFieldName);
String[] expectedCopyFields = getExpectedCopyFieldResponses(copyFields);
boolean success = false;
@ -236,6 +246,93 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
}
private class PutPostThread extends Thread {
RestTestHarness harness;
String fieldName;
boolean isPut;
public PutPostThread(RestTestHarness harness, String fieldName, boolean isPut) {
this.harness = harness;
this.fieldName = fieldName;
this.isPut = isPut;
}
public void run() {
try {
if (isPut) {
addFieldPut(harness, fieldName);
} else {
addFieldPost(harness, fieldName);
}
} catch (Exception e) {
// log.error("###ACTUAL FAILURE!");
throw new RuntimeException(e);
}
}
}
private void schemaLockTest() throws Exception {
// First, add a bunch of fields via PUT and POST, as well as copyFields,
// but do it fast enough and verify shards' schemas after all of them are added
int numFields = 25;
int numAddFieldPuts = 0;
int numAddFieldPosts = 0;
final String putFieldName = "newfieldPutThread";
final String postFieldName = "newfieldPostThread";
for (int i = 0; i <= numFields ; ++i) {
// System.err.println("###ITERATION: " + i);
int postHarness = r.nextInt(restTestHarnesses.size());
RestTestHarness publisher = restTestHarnesses.get(postHarness);
PutPostThread postThread = new PutPostThread(publisher, postFieldName + numAddFieldPosts++, false);
postThread.start();
int putHarness = r.nextInt(restTestHarnesses.size());
publisher = restTestHarnesses.get(putHarness);
PutPostThread putThread = new PutPostThread(publisher, putFieldName + numAddFieldPuts++, true);
putThread.start();
postThread.join();
putThread.join();
String[] expectedAddFields = getExpectedFieldResponses(numAddFieldPuts, putFieldName,
numAddFieldPosts, postFieldName);
boolean success = false;
long maxTimeoutMillis = 100000;
long startTime = System.nanoTime();
String request = null;
String response = null;
String result = null;
while ( ! success
&& TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutMillis) {
Thread.sleep(10);
// int j = 0;
for (RestTestHarness client : restTestHarnesses) {
// System.err.println("###CHECKING HARNESS: " + j++ + " for iteration: " + i);
// verify addFieldPuts and addFieldPosts
request = "/schema/fields?wt=xml";
response = client.query(request);
//System.err.println("###RESPONSE: " + response);
result = BaseTestHarness.validateXPath(response, expectedAddFields);
if (result != null) {
// System.err.println("###FAILURE!");
break;
}
}
success = (result == null);
}
if ( ! success) {
String msg = "QUERY FAILED: xpath=" + result + " request=" + request + " response=" + response;
log.error(msg);
fail(msg);
}
}
}
private static class CopyFieldInfo {
private String sourceField;
private String destField;