From 954f3dfdc52c3f2b7bd26752088075bc471f8598 Mon Sep 17 00:00:00 2001 From: Steven Rowe Date: Tue, 8 Jul 2014 03:20:54 +0000 Subject: [PATCH] SOLR-6180: Callers of ManagedIndexSchema mutators should hold the schemaUpdateLock. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1608646 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../schema/CopyFieldCollectionResource.java | 14 ++- .../rest/schema/FieldCollectionResource.java | 14 ++- .../solr/rest/schema/FieldResource.java | 14 ++- .../org/apache/solr/schema/IndexSchema.java | 33 +++++- .../solr/schema/ManagedIndexSchema.java | 84 +++++++------- ...AddSchemaFieldsUpdateProcessorFactory.java | 18 +-- .../TestCloudManagedSchemaConcurrent.java | 109 +++++++++++++++++- 8 files changed, 207 insertions(+), 82 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 8f23890e5de..4cf69235c9c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -157,6 +157,9 @@ Bug Fixes * SOLR-6223: SearchComponents may throw NPE when using shards.tolerant and there is a failure in the 'GET_FIELDS/GET_HIGHLIGHTS/GET_DEBUG' phase. (Tomás Fernández Löbbe via shalin) + +* SOLR-6180: Callers of ManagedIndexSchema mutators should hold the schemaUpdateLock. + (Gregory Chanan via Steve Rowe) Optimizations --------------------- diff --git a/solr/core/src/java/org/apache/solr/rest/schema/CopyFieldCollectionResource.java b/solr/core/src/java/org/apache/solr/rest/schema/CopyFieldCollectionResource.java index 25c284f1b88..bcd9a747e45 100644 --- a/solr/core/src/java/org/apache/solr/rest/schema/CopyFieldCollectionResource.java +++ b/solr/core/src/java/org/apache/solr/rest/schema/CopyFieldCollectionResource.java @@ -171,12 +171,14 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE boolean success = false; while (!success) { try { - IndexSchema newSchema = oldSchema.addCopyFields(fieldsToCopy); - if (null != newSchema) { - getSolrCore().setLatestSchema(newSchema); - success = true; - } else { - throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields."); + synchronized (oldSchema.getSchemaUpdateLock()) { + IndexSchema newSchema = oldSchema.addCopyFields(fieldsToCopy); + if (null != newSchema) { + getSolrCore().setLatestSchema(newSchema); + success = true; + } else { + throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields."); + } } } catch (ManagedIndexSchema.SchemaChangedInZkException e) { log.debug("Schema changed while processing request, retrying"); diff --git a/solr/core/src/java/org/apache/solr/rest/schema/FieldCollectionResource.java b/solr/core/src/java/org/apache/solr/rest/schema/FieldCollectionResource.java index 6d474528082..cf2a33d169f 100644 --- a/solr/core/src/java/org/apache/solr/rest/schema/FieldCollectionResource.java +++ b/solr/core/src/java/org/apache/solr/rest/schema/FieldCollectionResource.java @@ -195,12 +195,14 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl } } firstAttempt = false; - IndexSchema newSchema = oldSchema.addFields(newFields, copyFields); - if (null != newSchema) { - getSolrCore().setLatestSchema(newSchema); - success = true; - } else { - throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields."); + synchronized (oldSchema.getSchemaUpdateLock()) { + IndexSchema newSchema = oldSchema.addFields(newFields, copyFields); + if (null != newSchema) { + getSolrCore().setLatestSchema(newSchema); + success = true; + } else { + throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields."); + } } } catch (ManagedIndexSchema.SchemaChangedInZkException e) { log.debug("Schema changed while processing request, retrying"); diff --git a/solr/core/src/java/org/apache/solr/rest/schema/FieldResource.java b/solr/core/src/java/org/apache/solr/rest/schema/FieldResource.java index 0ba637fef21..e37294df25a 100644 --- a/solr/core/src/java/org/apache/solr/rest/schema/FieldResource.java +++ b/solr/core/src/java/org/apache/solr/rest/schema/FieldResource.java @@ -166,12 +166,14 @@ public class FieldResource extends BaseFieldResource implements GETable, PUTable while (!success) { try { SchemaField newField = oldSchema.newField(fieldName, fieldType, map); - IndexSchema newSchema = oldSchema.addField(newField, copyFieldNames); - if (null != newSchema) { - getSolrCore().setLatestSchema(newSchema); - success = true; - } else { - throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add field."); + synchronized (oldSchema.getSchemaUpdateLock()) { + IndexSchema newSchema = oldSchema.addField(newField, copyFieldNames); + if (null != newSchema) { + getSolrCore().setLatestSchema(newSchema); + success = true; + } else { + throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add field."); + } } } catch (ManagedIndexSchema.SchemaChangedInZkException e) { log.debug("Schema changed while processing request, retrying"); diff --git a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java index 5a9adb11521..67ecb951161 100644 --- a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java +++ b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java @@ -1473,7 +1473,9 @@ public class IndexSchema { } /** - * Copies this schema, adds the given field to the copy, then persists the new schema. + * Copies this schema, adds the given field to the copy, then persists the + * new schema. Requires synchronizing on the object returned by + * {@link #getSchemaUpdateLock()}. * * @param newField the SchemaField to add * @return a new IndexSchema based on this schema with newField added @@ -1486,7 +1488,9 @@ public class IndexSchema { } /** - * Copies this schema, adds the given field to the copy, then persists the new schema. + * Copies this schema, adds the given field to the copy, then persists the + * new schema. Requires synchronizing on the object returned by + * {@link #getSchemaUpdateLock()}. * * @param newField the SchemaField to add * @param copyFieldNames 0 or more names of targets to copy this field to. The targets must already exist. @@ -1500,7 +1504,9 @@ public class IndexSchema { } /** - * Copies this schema, adds the given fields to the copy, then persists the new schema. + * Copies this schema, adds the given fields to the copy, then persists the + * new schema. Requires synchronizing on the object returned by + * {@link #getSchemaUpdateLock()}. * * @param newFields the SchemaFields to add * @return a new IndexSchema based on this schema with newFields added @@ -1513,7 +1519,9 @@ public class IndexSchema { } /** - * Copies this schema, adds the given fields to the copy, then persists the new schema. + * Copies this schema, adds the given fields to the copy, then persists the + * new schema. Requires synchronizing on the object returned by + * {@link #getSchemaUpdateLock()}. * * @param newFields the SchemaFields to add * @param copyFieldNames 0 or more names of targets to copy this field to. The target fields must already exist. @@ -1527,7 +1535,10 @@ public class IndexSchema { } /** - * Copies this schema and adds the new copy fields to the copy, then persists the new schema + * Copies this schema and adds the new copy fields to the copy, then + * persists the new schema. Requires synchronizing on the object returned by + * {@link #getSchemaUpdateLock()}. + * * @param copyFields Key is the name of the source field name, value is a collection of target field names. Fields must exist. * @return The new Schema with the copy fields added */ @@ -1554,4 +1565,16 @@ public class IndexSchema { log.error(msg); throw new SolrException(ErrorCode.SERVER_ERROR, msg); } + + /** + * Returns the schema update lock that should be synchronzied on + * to update the schema. Only applicable to mutable schemas. + * + * @return the schema update lock object to synchronize on + */ + public Object getSchemaUpdateLock() { + String msg = "This IndexSchema is not mutable."; + log.error(msg); + throw new SolrException(ErrorCode.SERVER_ERROR, msg); + } } diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java index 73565626d16..0c88d4e8342 100644 --- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java +++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java @@ -209,33 +209,30 @@ public final class ManagedIndexSchema extends IndexSchema { if (copyFieldNames == null){ copyFieldNames = Collections.emptyMap(); } - // even though fields is volatile, we need to synchronize to avoid two addFields - // happening concurrently (and ending up missing one of them) - synchronized (getSchemaUpdateLock()) { - newSchema = shallowCopy(true); + newSchema = shallowCopy(true); - for (SchemaField newField : newFields) { - if (null != newSchema.getFieldOrNull(newField.getName())) { - String msg = "Field '" + newField.getName() + "' already exists."; - throw new FieldExistsException(ErrorCode.BAD_REQUEST, msg); - } - newSchema.fields.put(newField.getName(), newField); + for (SchemaField newField : newFields) { + if (null != newSchema.getFieldOrNull(newField.getName())) { + String msg = "Field '" + newField.getName() + "' already exists."; + throw new FieldExistsException(ErrorCode.BAD_REQUEST, msg); + } + newSchema.fields.put(newField.getName(), newField); - if (null != newField.getDefaultValue()) { - log.debug(newField.getName() + " contains default value: " + newField.getDefaultValue()); - newSchema.fieldsWithDefaultValue.add(newField); - } - if (newField.isRequired()) { - log.debug("{} is required in this schema", newField.getName()); - newSchema.requiredFields.add(newField); - } - Collection copyFields = copyFieldNames.get(newField.getName()); - if (copyFields != null) { - for (String copyField : copyFields) { - newSchema.registerCopyField(newField.getName(), copyField); - } + if (null != newField.getDefaultValue()) { + log.debug(newField.getName() + " contains default value: " + newField.getDefaultValue()); + newSchema.fieldsWithDefaultValue.add(newField); + } + if (newField.isRequired()) { + log.debug("{} is required in this schema", newField.getName()); + newSchema.requiredFields.add(newField); + } + Collection copyFields = copyFieldNames.get(newField.getName()); + if (copyFields != null) { + for (String copyField : copyFields) { + newSchema.registerCopyField(newField.getName(), copyField); } } + // Run the callbacks on SchemaAware now that everything else is done for (SchemaAware aware : newSchema.schemaAware) { aware.inform(newSchema); @@ -261,30 +258,26 @@ public final class ManagedIndexSchema extends IndexSchema { ManagedIndexSchema newSchema = null; if (isMutable) { boolean success = false; - // even though fields is volatile, we need to synchronize to avoid two addCopyFields - // happening concurrently (and ending up missing one of them) - synchronized (getSchemaUpdateLock()) { - newSchema = shallowCopy(true); - for (Map.Entry> entry : copyFields.entrySet()) { - //Key is the name of the field, values are the destinations + newSchema = shallowCopy(true); + for (Map.Entry> entry : copyFields.entrySet()) { + //Key is the name of the field, values are the destinations - for (String destination : entry.getValue()) { - newSchema.registerCopyField(entry.getKey(), destination); - } - } - //TODO: move this common stuff out to shared methods - // Run the callbacks on SchemaAware now that everything else is done - for (SchemaAware aware : newSchema.schemaAware) { - aware.inform(newSchema); - } - newSchema.refreshAnalyzers(); - success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists - if (success) { - log.debug("Added copy fields for {} sources", copyFields.size()); - } else { - log.error("Failed to add copy fields for {} sources", copyFields.size()); + for (String destination : entry.getValue()) { + newSchema.registerCopyField(entry.getKey(), destination); } } + //TODO: move this common stuff out to shared methods + // Run the callbacks on SchemaAware now that everything else is done + for (SchemaAware aware : newSchema.schemaAware) { + aware.inform(newSchema); + } + newSchema.refreshAnalyzers(); + success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists + if (success) { + log.debug("Added copy fields for {} sources", copyFields.size()); + } else { + log.error("Failed to add copy fields for {} sources", copyFields.size()); + } } return newSchema; } @@ -431,7 +424,8 @@ public final class ManagedIndexSchema extends IndexSchema { return newSchema; } - + + @Override public Object getSchemaUpdateLock() { return schemaUpdateLock; } diff --git a/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java index 0cd40ea0fa0..000f7133765 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java @@ -315,14 +315,16 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso log.debug(builder.toString()); } try { - IndexSchema newSchema = oldSchema.addFields(newFields); - if (null != newSchema) { - cmd.getReq().getCore().setLatestSchema(newSchema); - cmd.getReq().updateSchemaToLatest(); - log.debug("Successfully added field(s) to the schema."); - break; // success - exit from the retry loop - } else { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to add fields."); + synchronized (oldSchema.getSchemaUpdateLock()) { + IndexSchema newSchema = oldSchema.addFields(newFields); + if (null != newSchema) { + cmd.getReq().getCore().setLatestSchema(newSchema); + cmd.getReq().updateSchemaToLatest(); + log.debug("Successfully added field(s) to the schema."); + break; // success - exit from the retry loop + } else { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to add fields."); + } } } catch(ManagedIndexSchema.FieldExistsException e) { log.debug("At least one field to be added already exists in the schema - retrying."); diff --git a/solr/core/src/test/org/apache/solr/schema/TestCloudManagedSchemaConcurrent.java b/solr/core/src/test/org/apache/solr/schema/TestCloudManagedSchemaConcurrent.java index 49aba50d4c7..f6249ed85f9 100644 --- a/solr/core/src/test/org/apache/solr/schema/TestCloudManagedSchemaConcurrent.java +++ b/solr/core/src/test/org/apache/solr/schema/TestCloudManagedSchemaConcurrent.java @@ -111,18 +111,19 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB verifySuccess(request, response); } - private String[] getExpectedFieldResponses(int numAddFieldPuts, int numAddFieldPosts) { + private String[] getExpectedFieldResponses(int numAddFieldPuts, String putFieldName, + int numAddFieldPosts, String postFieldName) { String[] expectedAddFields = new String[1 + numAddFieldPuts + numAddFieldPosts]; expectedAddFields[0] = SUCCESS_XPATH; for (int i = 0; i < numAddFieldPuts; ++i) { - String newFieldName = "newfieldPut" + i; + String newFieldName = putFieldName + i; expectedAddFields[1 + i] = "/response/arr[@name='fields']/lst/str[@name='name'][.='" + newFieldName + "']"; } for (int i = 0; i < numAddFieldPosts; ++i) { - String newFieldName = "newfieldPost" + i; + String newFieldName = postFieldName + i; expectedAddFields[1 + numAddFieldPuts + i] = "/response/arr[@name='fields']/lst/str[@name='name'][.='" + newFieldName + "']"; } @@ -148,6 +149,11 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB @Override public void doTest() throws Exception { setupHarnesses(); + concurrentOperationsTest(); + schemaLockTest(); + } + + private void concurrentOperationsTest() throws Exception { // First, add a bunch of fields via PUT and POST, as well as copyFields, // but do it fast enough and verify shards' schemas after all of them are added @@ -155,16 +161,19 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB int numAddFieldPuts = 0; int numAddFieldPosts = 0; List copyFields = new ArrayList<>(); + + final String putFieldName = "newfieldPut"; + final String postFieldName = "newfieldPost"; for (int i = 0; i <= numFields ; ++i) { RestTestHarness publisher = restTestHarnesses.get(r.nextInt(restTestHarnesses.size())); int type = random().nextInt(3); if (type == 0) { // send an add field via PUT - addFieldPut(publisher, "newfieldPut" + numAddFieldPuts++); + addFieldPut(publisher, putFieldName + numAddFieldPuts++); } else if (type == 1) { // send an add field via POST - addFieldPost(publisher, "newfieldPost" + numAddFieldPosts++); + addFieldPost(publisher, postFieldName + numAddFieldPosts++); } else if (type == 2) { // send a copy field String sourceField = null; @@ -196,7 +205,8 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB } } - String[] expectedAddFields = getExpectedFieldResponses(numAddFieldPuts, numAddFieldPosts); + String[] expectedAddFields = getExpectedFieldResponses(numAddFieldPuts, putFieldName, + numAddFieldPosts, postFieldName); String[] expectedCopyFields = getExpectedCopyFieldResponses(copyFields); boolean success = false; @@ -236,6 +246,93 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB } } + private class PutPostThread extends Thread { + RestTestHarness harness; + String fieldName; + boolean isPut; + public PutPostThread(RestTestHarness harness, String fieldName, boolean isPut) { + this.harness = harness; + this.fieldName = fieldName; + this.isPut = isPut; + } + + public void run() { + try { + if (isPut) { + addFieldPut(harness, fieldName); + } else { + addFieldPost(harness, fieldName); + } + } catch (Exception e) { + // log.error("###ACTUAL FAILURE!"); + throw new RuntimeException(e); + } + } + } + + private void schemaLockTest() throws Exception { + + // First, add a bunch of fields via PUT and POST, as well as copyFields, + // but do it fast enough and verify shards' schemas after all of them are added + int numFields = 25; + int numAddFieldPuts = 0; + int numAddFieldPosts = 0; + + final String putFieldName = "newfieldPutThread"; + final String postFieldName = "newfieldPostThread"; + + for (int i = 0; i <= numFields ; ++i) { + // System.err.println("###ITERATION: " + i); + int postHarness = r.nextInt(restTestHarnesses.size()); + RestTestHarness publisher = restTestHarnesses.get(postHarness); + PutPostThread postThread = new PutPostThread(publisher, postFieldName + numAddFieldPosts++, false); + postThread.start(); + + int putHarness = r.nextInt(restTestHarnesses.size()); + publisher = restTestHarnesses.get(putHarness); + PutPostThread putThread = new PutPostThread(publisher, putFieldName + numAddFieldPuts++, true); + putThread.start(); + postThread.join(); + putThread.join(); + + String[] expectedAddFields = getExpectedFieldResponses(numAddFieldPuts, putFieldName, + numAddFieldPosts, postFieldName); + + boolean success = false; + long maxTimeoutMillis = 100000; + long startTime = System.nanoTime(); + String request = null; + String response = null; + String result = null; + + while ( ! success + && TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutMillis) { + Thread.sleep(10); + + // int j = 0; + for (RestTestHarness client : restTestHarnesses) { + // System.err.println("###CHECKING HARNESS: " + j++ + " for iteration: " + i); + + // verify addFieldPuts and addFieldPosts + request = "/schema/fields?wt=xml"; + response = client.query(request); + //System.err.println("###RESPONSE: " + response); + result = BaseTestHarness.validateXPath(response, expectedAddFields); + if (result != null) { + // System.err.println("###FAILURE!"); + break; + } + } + success = (result == null); + } + if ( ! success) { + String msg = "QUERY FAILED: xpath=" + result + " request=" + request + " response=" + response; + log.error(msg); + fail(msg); + } + } + } + private static class CopyFieldInfo { private String sourceField; private String destField;