mirror of https://github.com/apache/lucene.git
SOLR-6145: Fix Schema API optimistic concurrency by moving it out of ManagedIndexSchema.add(Copy)Fields() into the consumers of those methods: CopyFieldCollectionResource, FieldCollectionResource, FieldResource, and AddSchemaFieldsUpdateProcessorFactory.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1601770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0b621ca842
commit
5f2af20b04
|
@ -160,6 +160,12 @@ Bug Fixes
|
|||
* SOLR-6149: Specifying the query value without any index value does not work in
|
||||
Analysis browser. (Aman Tandon, shalin)
|
||||
|
||||
* SOLR-6145: Fix Schema API optimistic concurrency by moving it out of
|
||||
ManagedIndexSchema.add(Copy)Fields() into the consumers of those methods:
|
||||
CopyFieldCollectionResource, FieldCollectionResource, FieldResource,
|
||||
and AddSchemaFieldsUpdateProcessorFactory.
|
||||
(Gregory Chanan, Alexey Serba, Steve Rowe)
|
||||
|
||||
* SOLR-6146: Incorrect configuration such as wrong chroot in zk server address can
|
||||
cause CloudSolrServer to leak resources. (Jessica Cheng, Varun Thacker, shalin)
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.restlet.resource.ResourceException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -40,6 +39,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.solr.common.SolrException.ErrorCode;
|
||||
|
||||
/**
|
||||
* This class responds to requests at /solr/(corename)/schema/copyfields
|
||||
* <p/>
|
||||
|
@ -110,13 +111,13 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
|
|||
try {
|
||||
if (!getSchema().isMutable()) {
|
||||
final String message = "This IndexSchema is not mutable.";
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, message);
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, message);
|
||||
} else {
|
||||
if (!entity.getMediaType().equals(MediaType.APPLICATION_JSON, true)) {
|
||||
String message = "Only media type " + MediaType.APPLICATION_JSON.toString() + " is accepted."
|
||||
+ " Request has media type " + entity.getMediaType().toString() + ".";
|
||||
log.error(message);
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, message);
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, message);
|
||||
} else {
|
||||
Object object = ObjectBuilder.fromJSON(entity.getText());
|
||||
|
||||
|
@ -124,7 +125,7 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
|
|||
String message = "Invalid JSON type " + object.getClass().getName() + ", expected List of the form"
|
||||
+ " (ignore the backslashes): [{\"source\":\"foo\",\"dest\":\"comma-separated list of targets\"}, {...}, ...]";
|
||||
log.error(message);
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, message);
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, message);
|
||||
} else {
|
||||
List<Map<String, Object>> list = (List<Map<String, Object>>) object;
|
||||
Map<String, Collection<String>> fieldsToCopy = new HashMap<>();
|
||||
|
@ -135,7 +136,7 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
|
|||
if (null == fieldName) {
|
||||
String message = "Missing '" + IndexSchema.SOURCE + "' mapping.";
|
||||
log.error(message);
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, message);
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, message);
|
||||
}
|
||||
Object dest = map.get(IndexSchema.DESTINATION);
|
||||
List<String> destinations = null;
|
||||
|
@ -147,7 +148,7 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
|
|||
} else {
|
||||
String message = "Invalid '" + IndexSchema.DESTINATION + "' type.";
|
||||
log.error(message);
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, message);
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, message);
|
||||
}
|
||||
}
|
||||
if (destinations == null) {
|
||||
|
@ -165,11 +166,22 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
|
|||
message.setLength(message.length() - 2);//drop the last ,
|
||||
}
|
||||
log.error(message.toString().trim());
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, message.toString().trim());
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, message.toString().trim());
|
||||
}
|
||||
IndexSchema newSchema = oldSchema.addCopyFields(fieldsToCopy);
|
||||
if (newSchema != null) {
|
||||
getSolrCore().setLatestSchema(newSchema);
|
||||
boolean success = false;
|
||||
while (!success) {
|
||||
try {
|
||||
IndexSchema newSchema = oldSchema.addCopyFields(fieldsToCopy);
|
||||
if (null != newSchema) {
|
||||
getSolrCore().setLatestSchema(newSchema);
|
||||
success = true;
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields.");
|
||||
}
|
||||
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
|
||||
log.debug("Schema changed while processing request, retrying");
|
||||
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.solr.common.util.SimpleOrderedMap;
|
|||
import org.apache.solr.rest.GETable;
|
||||
import org.apache.solr.rest.POSTable;
|
||||
import org.apache.solr.schema.IndexSchema;
|
||||
import org.apache.solr.schema.ManagedIndexSchema;
|
||||
import org.apache.solr.schema.SchemaField;
|
||||
import org.noggit.ObjectBuilder;
|
||||
import org.restlet.data.MediaType;
|
||||
|
@ -139,9 +140,9 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl
|
|||
} else {
|
||||
List<Map<String, Object>> list = (List<Map<String, Object>>) object;
|
||||
List<SchemaField> newFields = new ArrayList<>();
|
||||
List<NewFieldArguments> newFieldArguments = new ArrayList<>();
|
||||
IndexSchema oldSchema = getSchema();
|
||||
Map<String, Collection<String>> copyFields = new HashMap<>();
|
||||
Set<String> malformed = new HashSet<>();
|
||||
for (Map<String, Object> map : list) {
|
||||
String fieldName = (String) map.remove(IndexSchema.NAME);
|
||||
if (null == fieldName) {
|
||||
|
@ -174,10 +175,38 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl
|
|||
copyFields.put(fieldName, copyTo);
|
||||
}
|
||||
newFields.add(oldSchema.newField(fieldName, fieldType, map));
|
||||
newFieldArguments.add(new NewFieldArguments(fieldName, fieldType, map));
|
||||
}
|
||||
boolean firstAttempt = true;
|
||||
boolean success = false;
|
||||
while (!success) {
|
||||
try {
|
||||
if (!firstAttempt) {
|
||||
// If this isn't the first attempt, we must have failed due to
|
||||
// the schema changing in Zk during optimistic concurrency control.
|
||||
// In that case, rerun creating the new fields, because they may
|
||||
// fail now due to changes in the schema. This behavior is consistent
|
||||
// with what would happen if we locked the schema and the other schema
|
||||
// change went first.
|
||||
newFields.clear();
|
||||
for (NewFieldArguments args : newFieldArguments) {
|
||||
newFields.add(oldSchema.newField(
|
||||
args.getName(), args.getType(), args.getMap()));
|
||||
}
|
||||
}
|
||||
firstAttempt = false;
|
||||
IndexSchema newSchema = oldSchema.addFields(newFields, copyFields);
|
||||
if (null != newSchema) {
|
||||
getSolrCore().setLatestSchema(newSchema);
|
||||
success = true;
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add fields.");
|
||||
}
|
||||
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
|
||||
log.debug("Schema changed while processing request, retrying");
|
||||
oldSchema = getSolrCore().getLatestSchema();
|
||||
}
|
||||
}
|
||||
IndexSchema newSchema = oldSchema.addFields(newFields, copyFields);
|
||||
|
||||
getSolrCore().setLatestSchema(newSchema);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -188,4 +217,19 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl
|
|||
|
||||
return new SolrOutputRepresentation();
|
||||
}
|
||||
|
||||
private static class NewFieldArguments {
|
||||
private String name;
|
||||
private String type;
|
||||
Map<String, Object> map;
|
||||
NewFieldArguments(String name, String type, Map<String, Object> map){
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
public String getName() { return name; }
|
||||
public String getType() { return type; }
|
||||
public Map<String, Object> getMap() { return map; }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,9 +162,22 @@ public class FieldResource extends BaseFieldResource implements GETable, PUTable
|
|||
if (copyFieldNames != null) {
|
||||
map.remove(IndexSchema.COPY_FIELDS);
|
||||
}
|
||||
SchemaField newField = oldSchema.newField(fieldName, fieldType, map);
|
||||
IndexSchema newSchema = oldSchema.addField(newField, copyFieldNames);
|
||||
getSolrCore().setLatestSchema(newSchema);
|
||||
boolean success = false;
|
||||
while (!success) {
|
||||
try {
|
||||
SchemaField newField = oldSchema.newField(fieldName, fieldType, map);
|
||||
IndexSchema newSchema = oldSchema.addField(newField, copyFieldNames);
|
||||
if (null != newSchema) {
|
||||
getSolrCore().setLatestSchema(newSchema);
|
||||
success = true;
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to add field.");
|
||||
}
|
||||
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
|
||||
log.debug("Schema changed while processing request, retrying");
|
||||
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -129,6 +129,7 @@ public final class ManagedIndexSchema extends IndexSchema {
|
|||
final SolrZkClient zkClient = zkController.getZkClient();
|
||||
final String managedSchemaPath = zkLoader.getCollectionZkPath() + "/" + managedSchemaResourceName;
|
||||
boolean success = true;
|
||||
boolean schemaChangedInZk = false;
|
||||
try {
|
||||
// Persist the managed schema
|
||||
StringWriter writer = new StringWriter();
|
||||
|
@ -151,9 +152,8 @@ public final class ManagedIndexSchema extends IndexSchema {
|
|||
schemaZkVersion = stat.getVersion();
|
||||
log.info("Persisted managed schema at " + managedSchemaPath);
|
||||
} catch (KeeperException.BadVersionException e) {
|
||||
log.info("Failed to persist managed schema at " + managedSchemaPath
|
||||
+ " - version mismatch");
|
||||
success = false;
|
||||
schemaChangedInZk = true;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -164,6 +164,12 @@ public final class ManagedIndexSchema extends IndexSchema {
|
|||
log.error(msg, e);
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, msg, e);
|
||||
}
|
||||
if (schemaChangedInZk) {
|
||||
String msg = "Failed to persist managed schema at " + managedSchemaPath
|
||||
+ " - version mismatch";
|
||||
log.info(msg);
|
||||
throw new SchemaChangedInZkException(ErrorCode.CONFLICT, msg + ", retry.");
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
|
@ -183,6 +189,12 @@ public final class ManagedIndexSchema extends IndexSchema {
|
|||
}
|
||||
}
|
||||
|
||||
public class SchemaChangedInZkException extends SolrException {
|
||||
public SchemaChangedInZkException(ErrorCode code, String msg) {
|
||||
super(code, msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ManagedIndexSchema addFields(Collection<SchemaField> newFields) {
|
||||
return addFields(newFields, Collections.<String, Collection<String>>emptyMap());
|
||||
|
@ -196,45 +208,44 @@ public final class ManagedIndexSchema extends IndexSchema {
|
|||
if (copyFieldNames == null){
|
||||
copyFieldNames = Collections.emptyMap();
|
||||
}
|
||||
while ( ! success) { // optimistic concurrency
|
||||
// even though fields is volatile, we need to synchronize to avoid two addFields
|
||||
// happening concurrently (and ending up missing one of them)
|
||||
synchronized (getSchemaUpdateLock()) {
|
||||
newSchema = shallowCopy(true);
|
||||
// even though fields is volatile, we need to synchronize to avoid two addFields
|
||||
// happening concurrently (and ending up missing one of them)
|
||||
synchronized (getSchemaUpdateLock()) {
|
||||
newSchema = shallowCopy(true);
|
||||
|
||||
for (SchemaField newField : newFields) {
|
||||
if (null != newSchema.getFieldOrNull(newField.getName())) {
|
||||
String msg = "Field '" + newField.getName() + "' already exists.";
|
||||
throw new FieldExistsException(ErrorCode.BAD_REQUEST, msg);
|
||||
}
|
||||
newSchema.fields.put(newField.getName(), newField);
|
||||
for (SchemaField newField : newFields) {
|
||||
if (null != newSchema.getFieldOrNull(newField.getName())) {
|
||||
String msg = "Field '" + newField.getName() + "' already exists.";
|
||||
throw new FieldExistsException(ErrorCode.BAD_REQUEST, msg);
|
||||
}
|
||||
newSchema.fields.put(newField.getName(), newField);
|
||||
|
||||
if (null != newField.getDefaultValue()) {
|
||||
log.debug(newField.getName() + " contains default value: " + newField.getDefaultValue());
|
||||
newSchema.fieldsWithDefaultValue.add(newField);
|
||||
}
|
||||
if (newField.isRequired()) {
|
||||
log.debug("{} is required in this schema", newField.getName());
|
||||
newSchema.requiredFields.add(newField);
|
||||
}
|
||||
Collection<String> copyFields = copyFieldNames.get(newField.getName());
|
||||
if (copyFields != null) {
|
||||
for (String copyField : copyFields) {
|
||||
newSchema.registerCopyField(newField.getName(), copyField);
|
||||
}
|
||||
}
|
||||
if (null != newField.getDefaultValue()) {
|
||||
log.debug(newField.getName() + " contains default value: " + newField.getDefaultValue());
|
||||
newSchema.fieldsWithDefaultValue.add(newField);
|
||||
}
|
||||
// Run the callbacks on SchemaAware now that everything else is done
|
||||
for (SchemaAware aware : newSchema.schemaAware) {
|
||||
aware.inform(newSchema);
|
||||
if (newField.isRequired()) {
|
||||
log.debug("{} is required in this schema", newField.getName());
|
||||
newSchema.requiredFields.add(newField);
|
||||
}
|
||||
newSchema.refreshAnalyzers();
|
||||
success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists
|
||||
if (success) {
|
||||
log.debug("Added field(s): {}", newFields);
|
||||
Collection<String> copyFields = copyFieldNames.get(newField.getName());
|
||||
if (copyFields != null) {
|
||||
for (String copyField : copyFields) {
|
||||
newSchema.registerCopyField(newField.getName(), copyField);
|
||||
}
|
||||
}
|
||||
}
|
||||
// release the lock between tries to allow the schema reader to update the schema & schemaZkVersion
|
||||
// Run the callbacks on SchemaAware now that everything else is done
|
||||
for (SchemaAware aware : newSchema.schemaAware) {
|
||||
aware.inform(newSchema);
|
||||
}
|
||||
newSchema.refreshAnalyzers();
|
||||
success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists
|
||||
if (success) {
|
||||
log.debug("Added field(s): {}", newFields);
|
||||
} else {
|
||||
log.error("Failed to add field(s): {}", newFields);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
String msg = "This ManagedIndexSchema is not mutable.";
|
||||
|
@ -249,29 +260,29 @@ public final class ManagedIndexSchema extends IndexSchema {
|
|||
ManagedIndexSchema newSchema = null;
|
||||
if (isMutable) {
|
||||
boolean success = false;
|
||||
while (!success) { // optimistic concurrency
|
||||
// even though fields is volatile, we need to synchronize to avoid two addCopyFields
|
||||
// happening concurrently (and ending up missing one of them)
|
||||
synchronized (getSchemaUpdateLock()) {
|
||||
newSchema = shallowCopy(true);
|
||||
for (Map.Entry<String, Collection<String>> entry : copyFields.entrySet()) {
|
||||
//Key is the name of the field, values are the destinations
|
||||
// even though fields is volatile, we need to synchronize to avoid two addCopyFields
|
||||
// happening concurrently (and ending up missing one of them)
|
||||
synchronized (getSchemaUpdateLock()) {
|
||||
newSchema = shallowCopy(true);
|
||||
for (Map.Entry<String, Collection<String>> entry : copyFields.entrySet()) {
|
||||
//Key is the name of the field, values are the destinations
|
||||
|
||||
for (String destination : entry.getValue()) {
|
||||
newSchema.registerCopyField(entry.getKey(), destination);
|
||||
}
|
||||
}
|
||||
//TODO: move this common stuff out to shared methods
|
||||
// Run the callbacks on SchemaAware now that everything else is done
|
||||
for (SchemaAware aware : newSchema.schemaAware) {
|
||||
aware.inform(newSchema);
|
||||
}
|
||||
newSchema.refreshAnalyzers();
|
||||
success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists
|
||||
if (success) {
|
||||
log.debug("Added copy fields for {} sources", copyFields.size());
|
||||
for (String destination : entry.getValue()) {
|
||||
newSchema.registerCopyField(entry.getKey(), destination);
|
||||
}
|
||||
}
|
||||
//TODO: move this common stuff out to shared methods
|
||||
// Run the callbacks on SchemaAware now that everything else is done
|
||||
for (SchemaAware aware : newSchema.schemaAware) {
|
||||
aware.inform(newSchema);
|
||||
}
|
||||
newSchema.refreshAnalyzers();
|
||||
success = newSchema.persistManagedSchema(false); // don't just create - update it if it already exists
|
||||
if (success) {
|
||||
log.debug("Added copy fields for {} sources", copyFields.size());
|
||||
} else {
|
||||
log.error("Failed to add copy fields for {} sources", copyFields.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
return newSchema;
|
||||
|
|
|
@ -291,7 +291,7 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso
|
|||
final IndexSchema oldSchema = core.getLatestSchema();
|
||||
List<SchemaField> newFields = new ArrayList<>();
|
||||
for (final String fieldName : doc.getFieldNames()) {
|
||||
if (selector.shouldMutate(fieldName)) {
|
||||
if (selector.shouldMutate(fieldName)) { // returns false if the field already exists in the latest schema
|
||||
String fieldTypeName = mapValueClassesToFieldType(doc.getField(fieldName));
|
||||
newFields.add(oldSchema.newField(fieldName, fieldTypeName, Collections.<String,Object>emptyMap()));
|
||||
}
|
||||
|
@ -323,6 +323,9 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso
|
|||
} catch(ManagedIndexSchema.FieldExistsException e) {
|
||||
log.debug("At least one field to be added already exists in the schema - retrying.");
|
||||
// No action: at least one field to be added already exists in the schema, so retry
|
||||
// We should never get here, since selector.shouldMutate(field) will exclude already existing fields
|
||||
} catch(ManagedIndexSchema.SchemaChangedInZkException e) {
|
||||
log.debug("Schema changed while processing request - retrying.");
|
||||
}
|
||||
}
|
||||
super.processAdd(cmd);
|
||||
|
|
|
@ -83,7 +83,8 @@ public class TestCloudManagedSchemaAddField extends AbstractFullDistribZkTestBas
|
|||
public void doTest() throws Exception {
|
||||
setupHarnesses();
|
||||
|
||||
// First. add a bunch of fields, and verify each is present in all shards' schemas
|
||||
// First. add a bunch of fields, but do it fast enough
|
||||
// and verify shards' schemas after all of them are added
|
||||
int numFields = 25;
|
||||
for (int i = 1 ; i <= numFields ; ++i) {
|
||||
RestTestHarness publisher = restTestHarnesses.get(r.nextInt(restTestHarnesses.size()));
|
||||
|
@ -91,69 +92,32 @@ public class TestCloudManagedSchemaAddField extends AbstractFullDistribZkTestBas
|
|||
final String content = "{\"type\":\"text\",\"stored\":\"false\"}";
|
||||
String request = "/schema/fields/" + newFieldName + "?wt=xml";
|
||||
String response = publisher.put(request, content);
|
||||
final long addFieldTime = System.currentTimeMillis();
|
||||
String result = publisher.validateXPath
|
||||
(response, "/response/lst[@name='responseHeader']/int[@name='status'][.='0']");
|
||||
if (null != result) {
|
||||
fail("PUT REQUEST FAILED: xpath=" + result + " request=" + request
|
||||
+ " content=" + content + " response=" + response);
|
||||
}
|
||||
}
|
||||
|
||||
int maxAttempts = 40;
|
||||
long retryPauseMillis = 20;
|
||||
Thread.sleep(100000);
|
||||
|
||||
for (int i = 1 ; i <= numFields ; ++i) {
|
||||
String newFieldName = "newfield" + i;
|
||||
for (RestTestHarness client : restTestHarnesses) {
|
||||
boolean stillTrying = true;
|
||||
for (int attemptNum = 1; stillTrying && attemptNum <= maxAttempts ; ++attemptNum) {
|
||||
request = "/schema/fields/" + newFieldName + "?wt=xml";
|
||||
response = client.query(request);
|
||||
long elapsedTimeMillis = System.currentTimeMillis() - addFieldTime;
|
||||
result = client.validateXPath(response,
|
||||
"/response/lst[@name='responseHeader']/int[@name='status'][.='0']",
|
||||
"/response/lst[@name='field']/str[@name='name'][.='" + newFieldName + "']");
|
||||
if (null == result) {
|
||||
stillTrying = false;
|
||||
if (attemptNum > 1) {
|
||||
log.info("On attempt #" + attemptNum + ", successful request " + request + " against server "
|
||||
+ client.getBaseURL() + " after " + elapsedTimeMillis + " ms");
|
||||
}
|
||||
} else {
|
||||
if (attemptNum == maxAttempts || ! response.contains("Field '" + newFieldName + "' not found.")) {
|
||||
String msg = "QUERY FAILED: xpath=" + result + " request=" + request + " response=" + response;
|
||||
if (attemptNum == maxAttempts) {
|
||||
msg = "Max retry count " + maxAttempts + " exceeded after " + elapsedTimeMillis +" ms. " + msg;
|
||||
}
|
||||
log.error(msg);
|
||||
fail(msg);
|
||||
}
|
||||
Thread.sleep(retryPauseMillis);
|
||||
String request = "/schema/fields/" + newFieldName + "?wt=xml";
|
||||
String response = client.query(request);
|
||||
String result = client.validateXPath(response,
|
||||
"/response/lst[@name='responseHeader']/int[@name='status'][.='0']",
|
||||
"/response/lst[@name='field']/str[@name='name'][.='" + newFieldName + "']");
|
||||
if (null != result) {
|
||||
if (response.contains("Field '" + newFieldName + "' not found.")) {
|
||||
String msg = "QUERY FAILED: xpath=" + result + " request=" + request + " response=" + response;
|
||||
log.error(msg);
|
||||
fail(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add a doc with one of the newly created fields
|
||||
String fieldName = "newfield" + (r.nextInt(numFields) + 1);
|
||||
|
||||
int addDocClientNum = r.nextInt(restTestHarnesses.size());
|
||||
RestTestHarness client = restTestHarnesses.get(addDocClientNum);
|
||||
String updateResult = client.validateUpdate(adoc(fieldName, "word1 word2", "id", "88"));
|
||||
assertNull("Error adding a document with field " + fieldName + ": " + updateResult, updateResult);
|
||||
updateResult = client.validateUpdate(BaseTestHarness.commit());
|
||||
assertNull("Error committing: " + updateResult, updateResult);
|
||||
|
||||
// Query for the newly added doc against a different client
|
||||
int queryDocClientNum = r.nextInt(restTestHarnesses.size());
|
||||
while (queryDocClientNum == addDocClientNum) {
|
||||
queryDocClientNum = r.nextInt(restTestHarnesses.size());
|
||||
}
|
||||
client = restTestHarnesses.get(queryDocClientNum);
|
||||
String response = client.query("/select?q=" + fieldName + ":word2");
|
||||
String queryResult = client.validateXPath(response,
|
||||
"/response/result[@name='response'][@numFound='1']",
|
||||
"count(/response/result[@name='response']/doc/int[@name='id']) = 1",
|
||||
"/response/result[@name='response']/doc/int[@name='id'] = '88'");
|
||||
assertNull("Error querying for a document with field " + fieldName + ": " + queryResult
|
||||
+ " response=" + response, queryResult);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
package org.apache.solr.schema;
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
|
||||
import org.apache.solr.util.BaseTestHarness;
|
||||
import org.apache.solr.util.RESTfulServerProvider;
|
||||
import org.apache.solr.util.RestTestHarness;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.restlet.ext.servlet.ServerServlet;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class TestCloudManagedSchemaAddFields extends AbstractFullDistribZkTestBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(TestCloudManagedSchemaAddField.class);
|
||||
|
||||
public TestCloudManagedSchemaAddFields() {
|
||||
super();
|
||||
fixShardCount = true;
|
||||
|
||||
sliceCount = 4;
|
||||
shardCount = 8;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void initSysProperties() {
|
||||
System.setProperty("managed.schema.mutable", "true");
|
||||
System.setProperty("enable.update.log", "true");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCloudSolrConfig() {
|
||||
return "solrconfig-managed-schema.xml";
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedMap<ServletHolder,String> getExtraServlets() {
|
||||
final SortedMap<ServletHolder,String> extraServlets = new TreeMap<>();
|
||||
final ServletHolder solrRestApi = new ServletHolder("SolrSchemaRestApi", ServerServlet.class);
|
||||
solrRestApi.setInitParameter("org.restlet.application", "org.apache.solr.rest.SolrSchemaRestApi");
|
||||
extraServlets.put(solrRestApi, "/schema/*"); // '/schema/*' matches '/schema', '/schema/', and '/schema/whatever...'
|
||||
return extraServlets;
|
||||
}
|
||||
|
||||
private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
|
||||
|
||||
private void setupHarnesses() {
|
||||
for (int i = 0 ; i < clients.size() ; ++i) {
|
||||
final HttpSolrServer client = (HttpSolrServer)clients.get(i);
|
||||
RestTestHarness harness = new RestTestHarness(new RESTfulServerProvider() {
|
||||
@Override
|
||||
public String getBaseURL() {
|
||||
return client.getBaseURL();
|
||||
}
|
||||
});
|
||||
restTestHarnesses.add(harness);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
setupHarnesses();
|
||||
|
||||
// First. add a bunch of fields, but do it fast enough
|
||||
// and verify shards' schemas after all of them are added
|
||||
int numFields = 200;
|
||||
for (int i = 1 ; i <= numFields ; ++i) {
|
||||
RestTestHarness publisher = restTestHarnesses.get(r.nextInt(restTestHarnesses.size()));
|
||||
String newFieldName = "newfield" + i;
|
||||
final String content = "[{\"name\":\""+newFieldName+"\",\"type\":\"text\",\"stored\":\"false\"}]";
|
||||
String request = "/schema/fields/?wt=xml";
|
||||
String response = publisher.post(request, content);
|
||||
String result = publisher.validateXPath
|
||||
(response, "/response/lst[@name='responseHeader']/int[@name='status'][.='0']");
|
||||
if (null != result) {
|
||||
fail("POST REQUEST FAILED: xpath=" + result + " request=" + request
|
||||
+ " content=" + content + " response=" + response);
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(100000);
|
||||
|
||||
for (int i = 1 ; i <= numFields ; ++i) {
|
||||
String newFieldName = "newfield" + i;
|
||||
for (RestTestHarness client : restTestHarnesses) {
|
||||
String request = "/schema/fields/" + newFieldName + "?wt=xml";
|
||||
String response = client.query(request);
|
||||
String result = client.validateXPath(response,
|
||||
"/response/lst[@name='responseHeader']/int[@name='status'][.='0']",
|
||||
"/response/lst[@name='field']/str[@name='name'][.='" + newFieldName + "']");
|
||||
if (null != result) {
|
||||
if (response.contains("Field '" + newFieldName + "' not found.")) {
|
||||
String msg = "QUERY FAILED: xpath=" + result + " request=" + request + " response=" + response;
|
||||
log.error(msg);
|
||||
fail(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
package org.apache.solr.schema;
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
|
||||
import org.apache.solr.util.RESTfulServerProvider;
|
||||
import org.apache.solr.util.RestTestHarness;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.restlet.ext.servlet.ServerServlet;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class TestCloudManagedSchemaCopyFields extends AbstractFullDistribZkTestBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(TestCloudManagedSchemaAddField.class);
|
||||
|
||||
public TestCloudManagedSchemaCopyFields() {
|
||||
super();
|
||||
fixShardCount = true;
|
||||
|
||||
sliceCount = 4;
|
||||
shardCount = 8;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void initSysProperties() {
|
||||
System.setProperty("managed.schema.mutable", "true");
|
||||
System.setProperty("enable.update.log", "true");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCloudSolrConfig() {
|
||||
return "solrconfig-managed-schema.xml";
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedMap<ServletHolder,String> getExtraServlets() {
|
||||
final SortedMap<ServletHolder,String> extraServlets = new TreeMap<>();
|
||||
final ServletHolder solrRestApi = new ServletHolder("SolrSchemaRestApi", ServerServlet.class);
|
||||
solrRestApi.setInitParameter("org.restlet.application", "org.apache.solr.rest.SolrSchemaRestApi");
|
||||
extraServlets.put(solrRestApi, "/schema/*"); // '/schema/*' matches '/schema', '/schema/', and '/schema/whatever...'
|
||||
return extraServlets;
|
||||
}
|
||||
|
||||
private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
|
||||
|
||||
private void setupHarnesses() {
|
||||
for (int i = 0 ; i < clients.size() ; ++i) {
|
||||
final HttpSolrServer client = (HttpSolrServer)clients.get(i);
|
||||
RestTestHarness harness = new RestTestHarness(new RESTfulServerProvider() {
|
||||
@Override
|
||||
public String getBaseURL() {
|
||||
return client.getBaseURL();
|
||||
}
|
||||
});
|
||||
restTestHarnesses.add(harness);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
setupHarnesses();
|
||||
|
||||
// First, add the same copy field directive a bunch of times.
|
||||
// Then verify each shard's schema has it.
|
||||
int numFields = 200;
|
||||
for (int i = 1 ; i <= numFields ; ++i) {
|
||||
RestTestHarness publisher = restTestHarnesses.get(r.nextInt(restTestHarnesses.size()));
|
||||
final String content = "[{\"source\":\""+"sku1"+"\",\"dest\":[\"sku2\"]}]";
|
||||
String request = "/schema/copyfields/?wt=xml";
|
||||
String response = publisher.post(request, content);
|
||||
String result = publisher.validateXPath
|
||||
(response, "/response/lst[@name='responseHeader']/int[@name='status'][.='0']");
|
||||
if (null != result) {
|
||||
fail("POST REQUEST FAILED: xpath=" + result + " request=" + request
|
||||
+ " content=" + content + " response=" + response);
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(100000);
|
||||
|
||||
String request = "/schema/copyfields/?wt=xml&indent=on&source.fl=sku1";
|
||||
for (RestTestHarness client : restTestHarnesses) {
|
||||
String response = client.query(request);
|
||||
String result = client.validateXPath(response,
|
||||
"/response/lst[@name='responseHeader']/int[@name='status'][.='0']",
|
||||
"/response/arr[@name='copyFields']/lst/str[@name='dest'][.='sku2']");
|
||||
if (null != result) {
|
||||
fail("QUERY FAILED: xpath=" + result + " request=" + request + " response=" + response);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue