SOLR-6249: Schema API changes return success before all cores are updated

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1628181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2014-09-29 13:41:04 +00:00
parent 6c70700a59
commit 975f1b5bff
18 changed files with 491 additions and 85 deletions

View File

@ -255,6 +255,11 @@ Other Changes
* SOLR-6453: Stop throwing an error message from Overseer when node exits (Ramkumar Aiyengar, Noble Paul)
* SOLR-6249: Schema API changes return success before all cores are updated; client application
can provide the optional updateTimeoutSecs parameter to cause the server handling the
managed schema update to block until all replicas of the same collection have processed the
update or until the specified timeout is reached (Timothy Potter)
================== 4.10.1 ==================
Bug Fixes

View File

@ -28,6 +28,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.schema.ZkIndexSchemaReader;
import org.apache.zookeeper.KeeperException;
/**
@ -38,6 +39,7 @@ public class ZkSolrResourceLoader extends SolrResourceLoader {
private final String collectionZkPath;
private ZkController zkController;
private ZkIndexSchemaReader zkIndexSchemaReader;
public ZkSolrResourceLoader(String instanceDir, String collection,
ZkController zooKeeperController) {
@ -129,4 +131,10 @@ public class ZkSolrResourceLoader extends SolrResourceLoader {
public ZkController getZkController() {
return zkController;
}
public void setZkIndexSchemaReader(ZkIndexSchemaReader zkIndexSchemaReader) {
this.zkIndexSchemaReader = zkIndexSchemaReader;
}
public ZkIndexSchemaReader getZkIndexSchemaReader() { return zkIndexSchemaReader; }
}

View File

@ -18,7 +18,6 @@ package org.apache.solr.rest;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrCore;
@ -48,6 +47,7 @@ import java.net.URLDecoder;
*/
public abstract class BaseSolrResource extends ServerResource {
protected static final String SHOW_DEFAULTS = "showDefaults";
public static final String UPDATE_TIMEOUT_SECS = "updateTimeoutSecs";
private SolrCore solrCore;
private IndexSchema schema;
@ -55,12 +55,14 @@ public abstract class BaseSolrResource extends ServerResource {
private SolrQueryResponse solrResponse;
private QueryResponseWriter responseWriter;
private String contentType;
private int updateTimeoutSecs = -1;
public SolrCore getSolrCore() { return solrCore; }
public IndexSchema getSchema() { return schema; }
public SolrQueryRequest getSolrRequest() { return solrRequest; }
public SolrQueryResponse getSolrResponse() { return solrResponse; }
public String getContentType() { return contentType; }
protected int getUpdateTimeoutSecs() { return updateTimeoutSecs; }
protected BaseSolrResource() {
super();
@ -122,6 +124,14 @@ public abstract class BaseSolrResource extends ServerResource {
solrRequest.getContext().put("webapp", firstPathElement); // Context path
}
SolrCore.preDecorateResponse(solrRequest, solrResponse);
// client application can set a timeout for update requests
Object updateTimeoutSecsParam = getSolrRequest().getParams().get(UPDATE_TIMEOUT_SECS);
if (updateTimeoutSecsParam != null)
updateTimeoutSecs = (updateTimeoutSecsParam instanceof Number)
? ((Number) updateTimeoutSecsParam).intValue()
: Integer.parseInt(updateTimeoutSecsParam.toString());
}
}
} catch (Throwable t) {

View File

@ -29,6 +29,7 @@ import org.apache.solr.rest.schema.FieldTypeResource;
import org.apache.solr.rest.schema.SchemaNameResource;
import org.apache.solr.rest.schema.SchemaSimilarityResource;
import org.apache.solr.rest.schema.SchemaVersionResource;
import org.apache.solr.rest.schema.SchemaZkVersionResource;
import org.apache.solr.rest.schema.SolrQueryParserDefaultOperatorResource;
import org.apache.solr.rest.schema.SolrQueryParserResource;
import org.apache.solr.rest.schema.UniqueKeyFieldResource;
@ -78,6 +79,8 @@ public class SolrSchemaRestApi extends Application {
public static final String UNIQUE_KEY_FIELD = IndexSchema.UNIQUE_KEY.toLowerCase(Locale.ROOT);
public static final String UNIQUE_KEY_FIELD_PATH = "/" + UNIQUE_KEY_FIELD;
public static final String ZK_VERSION_PATH = "/zkversion";
/**
* Returns reserved endpoints under /schema
@ -95,6 +98,7 @@ public class SolrSchemaRestApi extends Application {
reservedEndpoints.add(RestManager.SCHEMA_BASE_PATH + SOLR_QUERY_PARSER_PATH);
reservedEndpoints.add(RestManager.SCHEMA_BASE_PATH + DEFAULT_OPERATOR_PATH);
reservedEndpoints.add(RestManager.SCHEMA_BASE_PATH + UNIQUE_KEY_FIELD_PATH);
reservedEndpoints.add(RestManager.SCHEMA_BASE_PATH + ZK_VERSION_PATH);
return Collections.unmodifiableSet(reservedEndpoints);
}
@ -156,6 +160,8 @@ public class SolrSchemaRestApi extends Application {
router.attach(SOLR_QUERY_PARSER_PATH, SolrQueryParserResource.class);
router.attach(DEFAULT_OPERATOR_PATH, SolrQueryParserDefaultOperatorResource.class);
router.attach(ZK_VERSION_PATH, SchemaZkVersionResource.class);
router.attachDefault(RestManager.ManagedEndpoint.class);
// attach all the dynamically registered schema resources

View File

@ -16,10 +16,13 @@ package org.apache.solr.rest.schema;
* limitations under the License.
*/
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.rest.BaseSolrResource;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.ManagedIndexSchema;
import org.apache.solr.schema.SchemaField;
import org.restlet.resource.ResourceException;
@ -100,6 +103,29 @@ abstract class BaseFieldResource extends BaseSolrResource {
return properties;
}
/**
* When running in cloud mode, waits for a schema update to be
* applied by all active replicas of the current collection.
*/
protected void waitForSchemaUpdateToPropagate(IndexSchema newSchema) {
// If using ZooKeeper and the client application has requested an update timeout, then block until all
// active replicas for this collection process the updated schema
if (getUpdateTimeoutSecs() > 0 && newSchema != null &&
newSchema.getResourceLoader() instanceof ZkSolrResourceLoader)
{
CoreDescriptor cd = getSolrCore().getCoreDescriptor();
String collection = cd.getCollectionName();
if (collection != null) {
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) newSchema.getResourceLoader();
ManagedIndexSchema.waitForSchemaZkVersionAgreement(collection,
cd.getCloudDescriptor().getCoreNodeName(),
((ManagedIndexSchema) newSchema).getSchemaZkVersion(),
zkLoader.getZkController(),
getUpdateTimeoutSecs());
}
}
}
// protected access on this class triggers a bug in javadoc generation caught by
// documentation-link: "BROKEN LINK" reported in javadoc for classes using
// NewFieldArguments because the link target file is BaseFieldResource.NewFieldArguments,

View File

@ -17,10 +17,13 @@ package org.apache.solr.rest.schema;
* limitations under the License.
*/
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.rest.BaseSolrResource;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.ManagedIndexSchema;
import org.restlet.resource.ResourceException;
import java.util.List;
@ -57,4 +60,40 @@ abstract class BaseFieldTypeResource extends BaseSolrResource {
/** Return a list of names of DynamicFields that have the given FieldType */
protected abstract List<String> getDynamicFieldsWithFieldType(FieldType fieldType);
/**
* Adds one or more new FieldType definitions to the managed schema for the given core.
*/
protected void addNewFieldTypes(List<FieldType> newFieldTypes, ManagedIndexSchema oldSchema) {
IndexSchema newSchema = null;
boolean success = false;
while (!success) {
try {
synchronized (oldSchema.getSchemaUpdateLock()) {
newSchema = oldSchema.addFieldTypes(newFieldTypes);
getSolrCore().setLatestSchema(newSchema);
success = true;
}
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
}
}
// If using ZooKeeper and the client application has requested an update timeout, then block until all
// active replicas for this collection process the updated schema
if (getUpdateTimeoutSecs() > 0 && newSchema != null &&
newSchema.getResourceLoader() instanceof ZkSolrResourceLoader)
{
CoreDescriptor cd = getSolrCore().getCoreDescriptor();
String collection = cd.getCollectionName();
if (collection != null) {
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) newSchema.getResourceLoader();
ManagedIndexSchema.waitForSchemaZkVersionAgreement(collection,
cd.getCloudDescriptor().getCoreNodeName(),
((ManagedIndexSchema) newSchema).getSchemaZkVersion(),
zkLoader.getZkController(),
getUpdateTimeoutSecs());
}
}
}
}

View File

@ -168,11 +168,12 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
log.error(message.toString().trim());
throw new SolrException(ErrorCode.BAD_REQUEST, message.toString().trim());
}
IndexSchema newSchema = null;
boolean success = false;
while (!success) {
try {
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addCopyFields(fieldsToCopy);
newSchema = oldSchema.addCopyFields(fieldsToCopy);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
@ -185,6 +186,7 @@ public class CopyFieldCollectionResource extends BaseFieldResource implements GE
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
}
}
waitForSchemaUpdateToPropagate(newSchema);
}
}
}

View File

@ -159,6 +159,7 @@ public class DynamicFieldCollectionResource extends BaseFieldResource implements
newDynamicFields.add(oldSchema.newDynamicField(fieldNamePattern, fieldType, map));
newDynamicFieldArguments.add(new NewFieldArguments(fieldNamePattern, fieldType, map));
}
IndexSchema newSchema = null;
boolean firstAttempt = true;
boolean success = false;
while ( ! success) {
@ -177,7 +178,7 @@ public class DynamicFieldCollectionResource extends BaseFieldResource implements
}
firstAttempt = false;
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addDynamicFields(newDynamicFields, copyFields);
newSchema = oldSchema.addDynamicFields(newDynamicFields, copyFields);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
@ -190,6 +191,9 @@ public class DynamicFieldCollectionResource extends BaseFieldResource implements
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
}
}
waitForSchemaUpdateToPropagate(newSchema);
}
}
}

View File

@ -157,12 +157,13 @@ public class DynamicFieldResource extends BaseFieldResource implements GETable,
if (copyFieldNames != null) {
map.remove(IndexSchema.COPY_FIELDS);
}
IndexSchema newSchema = null;
boolean success = false;
while ( ! success) {
try {
SchemaField newDynamicField = oldSchema.newDynamicField(fieldNamePattern, fieldType, map);
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addDynamicField(newDynamicField, copyFieldNames);
newSchema = oldSchema.addDynamicField(newDynamicField, copyFieldNames);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
@ -175,6 +176,8 @@ public class DynamicFieldResource extends BaseFieldResource implements GETable,
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
}
}
// if in cloud mode, wait for schema updates to propagate to all replicas
waitForSchemaUpdateToPropagate(newSchema);
}
}
}

View File

@ -17,10 +17,12 @@ package org.apache.solr.rest.schema;
*/
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.rest.GETable;
import org.apache.solr.rest.POSTable;
import org.apache.solr.schema.IndexSchema;
@ -177,6 +179,7 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl
newFields.add(oldSchema.newField(fieldName, fieldType, map));
newFieldArguments.add(new NewFieldArguments(fieldName, fieldType, map));
}
IndexSchema newSchema = null;
boolean firstAttempt = true;
boolean success = false;
while (!success) {
@ -196,7 +199,7 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl
}
firstAttempt = false;
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addFields(newFields, copyFields);
newSchema = oldSchema.addFields(newFields, copyFields);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
@ -209,6 +212,7 @@ public class FieldCollectionResource extends BaseFieldResource implements GETabl
oldSchema = getSolrCore().getLatestSchema();
}
}
waitForSchemaUpdateToPropagate(newSchema);
}
}
}

View File

@ -16,8 +16,11 @@ package org.apache.solr.rest.schema;
* limitations under the License.
*/
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.rest.GETable;
import org.apache.solr.rest.PUTable;
import org.apache.solr.schema.IndexSchema;
@ -162,12 +165,14 @@ public class FieldResource extends BaseFieldResource implements GETable, PUTable
if (copyFieldNames != null) {
map.remove(IndexSchema.COPY_FIELDS);
}
IndexSchema newSchema = null;
boolean success = false;
while (!success) {
try {
SchemaField newField = oldSchema.newField(fieldName, fieldType, map);
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addField(newField, copyFieldNames);
newSchema = oldSchema.addField(newField, copyFieldNames);
if (null != newSchema) {
getSolrCore().setLatestSchema(newSchema);
success = true;
@ -180,6 +185,7 @@ public class FieldResource extends BaseFieldResource implements GETable, PUTable
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
}
}
waitForSchemaUpdateToPropagate(newSchema);
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.solr.rest.schema;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrCore;
import org.apache.solr.rest.GETable;
import org.apache.solr.rest.POSTable;
import org.apache.solr.schema.FieldType;
@ -195,23 +194,4 @@ public class FieldTypeCollectionResource extends BaseFieldTypeResource implement
// now deploy the added types (all or nothing)
addNewFieldTypes(newFieldTypes, oldSchema);
}
/**
* Adds one or more new FieldType definitions to the managed schema for the given core.
*/
protected void addNewFieldTypes(List<FieldType> newFieldTypes, ManagedIndexSchema oldSchema) {
boolean success = false;
while (!success) {
try {
synchronized (oldSchema.getSchemaUpdateLock()) {
IndexSchema newSchema = oldSchema.addFieldTypes(newFieldTypes);
getSolrCore().setLatestSchema(newSchema);
success = true;
}
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
log.debug("Schema changed while processing request, retrying");
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
}
}
}
}

View File

@ -164,28 +164,7 @@ public class FieldTypeResource extends BaseFieldTypeResource implements GETable,
protected void addOrUpdateFieldType(Map<String,Object> fieldTypeJson) {
ManagedIndexSchema oldSchema = (ManagedIndexSchema) getSchema();
FieldType newFieldType = buildFieldTypeFromJson(oldSchema, typeName, fieldTypeJson);
addNewFieldType(newFieldType, oldSchema);
}
/**
* Adds a new FieldType definitions to the managed schema for the given core.
*/
protected void addNewFieldType(FieldType newFieldType, ManagedIndexSchema oldSchema) {
boolean success = false;
while (!success) {
try {
Object updateLock = oldSchema.getSchemaUpdateLock();
synchronized (updateLock) {
IndexSchema newSchema = oldSchema.addFieldTypes(Collections.singletonList(newFieldType));
getSolrCore().setLatestSchema(newSchema);
success = true;
}
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
log.info("Schema changed while processing request, retrying");
oldSchema = (ManagedIndexSchema)getSolrCore().getLatestSchema();
}
}
addNewFieldTypes(Collections.singletonList(newFieldType), oldSchema);
}
/**

View File

@ -0,0 +1,78 @@
package org.apache.solr.rest.schema;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.rest.BaseSolrResource;
import org.apache.solr.rest.GETable;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.ManagedIndexSchema;
import org.apache.solr.schema.ZkIndexSchemaReader;
import org.restlet.representation.Representation;
import org.restlet.resource.ResourceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class responds to requests at /solr/(corename)/schema/zkversion
*/
public class SchemaZkVersionResource extends BaseSolrResource implements GETable {
private static final Logger log = LoggerFactory.getLogger(SchemaZkVersionResource.class);
protected int refreshIfBelowVersion = -1;
public SchemaZkVersionResource() {
super();
}
@Override
public void doInit() throws ResourceException {
super.doInit();
// sometimes the client knows which version it expects
Object refreshParam = getSolrRequest().getParams().get("refreshIfBelowVersion");
if (refreshParam != null)
refreshIfBelowVersion = (refreshParam instanceof Number) ? ((Number)refreshParam).intValue()
: Integer.parseInt(refreshParam.toString());
}
@Override
public Representation get() {
try {
int zkVersion = -1;
IndexSchema schema = getSchema();
if (schema instanceof ManagedIndexSchema) {
ManagedIndexSchema managed = (ManagedIndexSchema)schema;
zkVersion = managed.getSchemaZkVersion();
if (refreshIfBelowVersion != -1 && zkVersion < refreshIfBelowVersion) {
log.info("\n\n\n REFRESHING SCHEMA (refreshIfBelowVersion="+refreshIfBelowVersion+") before returning version! \n\n\n");
ZkSolrResourceLoader zkSolrResourceLoader = (ZkSolrResourceLoader)getSolrCore().getResourceLoader();
ZkIndexSchemaReader zkIndexSchemaReader = zkSolrResourceLoader.getZkIndexSchemaReader();
managed = zkIndexSchemaReader.refreshSchemaFromZk(refreshIfBelowVersion);
zkVersion = managed.getSchemaZkVersion();
}
}
getSolrResponse().add("zkversion", zkVersion);
} catch (Exception e) {
getSolrResponse().setException(e);
}
handlePostExecution(log);
return new SolrOutputRepresentation();
}
}

View File

@ -22,22 +22,36 @@ import org.apache.lucene.analysis.util.CharFilterFactory;
import org.apache.lucene.analysis.util.TokenFilterFactory;
import org.apache.lucene.analysis.util.TokenizerFactory;
import org.apache.solr.analysis.TokenizerChain;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Config;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.rest.schema.FieldTypeXmlAdapter;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.lucene.analysis.util.ResourceLoaderAware;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
@ -49,7 +63,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -58,8 +71,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** Solr-managed schema - non-user-editable, but can be mutable via internal and external REST API requests. */
public final class ManagedIndexSchema extends IndexSchema {
@ -192,6 +211,169 @@ public final class ManagedIndexSchema extends IndexSchema {
return success;
}
/**
* Block up to a specified maximum time until we see agreement on the schema
* version in ZooKeeper across all replicas for a collection.
*/
public static void waitForSchemaZkVersionAgreement(String collection, String localCoreNodeName,
int schemaZkVersion, ZkController zkController, int maxWaitSecs)
{
long startMs = System.currentTimeMillis();
// get a list of active replica cores to query for the schema zk version (skipping this core of course)
List<GetZkSchemaVersionCallable> concurrentTasks = new ArrayList<>();
for (String coreUrl : getActiveReplicaCoreUrls(zkController, collection, localCoreNodeName))
concurrentTasks.add(new GetZkSchemaVersionCallable(coreUrl, schemaZkVersion));
if (concurrentTasks.isEmpty())
return; // nothing to wait for ...
log.info("Waiting up to "+maxWaitSecs+" secs for "+concurrentTasks.size()+
" replicas to apply schema update version "+schemaZkVersion+" for collection "+collection);
// use an executor service to invoke schema zk version requests in parallel with a max wait time
int poolSize = Math.min(concurrentTasks.size(), 10);
ExecutorService parallelExecutor =
Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("managedSchemaExecutor"));
try {
List<Future<Integer>> results =
parallelExecutor.invokeAll(concurrentTasks, maxWaitSecs, TimeUnit.SECONDS);
// determine whether all replicas have the update
List<String> failedList = null; // lazily init'd
for (int f=0; f < results.size(); f++) {
int vers = -1;
Future<Integer> next = results.get(f);
if (next.isDone() && !next.isCancelled()) {
// looks to have finished, but need to check the version value too
try {
vers = next.get();
} catch (ExecutionException e) {
// shouldn't happen since we checked isCancelled
}
}
if (vers == -1) {
String coreUrl = concurrentTasks.get(f).coreUrl;
log.warn("Core "+coreUrl+" version mismatch! Expected "+schemaZkVersion+" but got "+vers);
if (failedList == null) failedList = new ArrayList<>();
failedList.add(coreUrl);
}
}
// if any tasks haven't completed within the specified timeout, it's an error
if (failedList != null)
throw new SolrException(ErrorCode.SERVER_ERROR, failedList.size()+" out of "+(concurrentTasks.size() + 1)+
" replicas failed to update their schema to version "+schemaZkVersion+" within "+
maxWaitSecs+" seconds! Failed cores: "+failedList);
} catch (InterruptedException ie) {
log.warn("Core "+localCoreNodeName+" was interrupted waiting for schema version "+schemaZkVersion+
" to propagate to "+concurrentTasks.size()+" replicas for collection "+collection);
Thread.currentThread().interrupt();
} finally {
if (!parallelExecutor.isShutdown())
parallelExecutor.shutdownNow();
}
long diffMs = (System.currentTimeMillis() - startMs);
log.info("Took "+Math.round(diffMs/1000d)+" secs for "+concurrentTasks.size()+
" replicas to apply schema update version "+schemaZkVersion+" for collection "+collection);
}
protected static List<String> getActiveReplicaCoreUrls(ZkController zkController, String collection, String localCoreNodeName) {
List<String> activeReplicaCoreUrls = new ArrayList<>();
ZkStateReader zkStateReader = zkController.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
Collection<Slice> activeSlices = clusterState.getActiveSlices(collection);
if (activeSlices != null && activeSlices.size() > 0) {
for (Slice next : activeSlices) {
Map<String, Replica> replicasMap = next.getReplicasMap();
if (replicasMap != null) {
for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
Replica replica = entry.getValue();
if (!localCoreNodeName.equals(replica.getName()) &&
ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP)) &&
liveNodes.contains(replica.getNodeName())) {
ZkCoreNodeProps replicaCoreProps = new ZkCoreNodeProps(replica);
activeReplicaCoreUrls.add(replicaCoreProps.getCoreUrl());
}
}
}
}
}
return activeReplicaCoreUrls;
}
private static class GetZkSchemaVersionCallable extends SolrRequest implements Callable<Integer> {
private String coreUrl;
private int expectedZkVersion;
GetZkSchemaVersionCallable(String coreUrl, int expectedZkVersion) {
super(METHOD.GET, "/schema/zkversion");
this.coreUrl = coreUrl;
this.expectedZkVersion = expectedZkVersion;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams wparams = new ModifiableSolrParams();
wparams.set("refreshIfBelowVersion", expectedZkVersion);
return wparams;
}
@Override
public Integer call() throws Exception {
HttpSolrServer solr = new HttpSolrServer(coreUrl);
int remoteVersion = -1;
try {
// eventually, this loop will get killed by the ExecutorService's timeout
while (remoteVersion == -1 || remoteVersion < expectedZkVersion) {
try {
HttpSolrServer.HttpUriRequestResponse mrr = solr.httpUriRequest(this);
NamedList<Object> zkversionResp = mrr.future.get();
if (zkversionResp != null)
remoteVersion = (Integer)zkversionResp.get("zkversion");
if (remoteVersion < expectedZkVersion) {
// rather than waiting and re-polling, let's be proactive and tell the replica
// to refresh its schema from ZooKeeper, if that fails, then the
//Thread.sleep(1000); // slight delay before requesting version again
log.error("Replica "+coreUrl+" returned schema version "+
remoteVersion+" and has not applied schema version "+expectedZkVersion);
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
break; // stop looping
} else {
log.warn("Failed to get /schema/zkversion from " + coreUrl + " due to: " + e);
}
}
}
} finally {
solr.shutdown();
}
return remoteVersion;
}
@Override
public Collection<ContentStream> getContentStreams() throws IOException {
return null;
}
@Override
public SolrResponse process(SolrServer server) throws SolrServerException, IOException {
return null;
}
}
public class FieldExistsException extends SolrException {
public FieldExistsException(ErrorCode code, String msg) {
super(code, msg);
@ -203,7 +385,7 @@ public final class ManagedIndexSchema extends IndexSchema {
super(code, msg);
}
}
@Override
public ManagedIndexSchema addField(SchemaField newField) {
return addFields(Arrays.asList(newField));
@ -483,6 +665,10 @@ public final class ManagedIndexSchema extends IndexSchema {
}
return sf;
}
public int getSchemaZkVersion() {
return schemaZkVersion;
}
@Override
public SchemaField newDynamicField(String fieldNamePattern, String fieldType, Map<String,?> options) {

View File

@ -389,6 +389,8 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
this.core = core;
if (loader instanceof ZkSolrResourceLoader) {
this.zkIndexSchemaReader = new ZkIndexSchemaReader(this);
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader;
zkLoader.setZkIndexSchemaReader(this.zkIndexSchemaReader);
} else {
this.zkIndexSchemaReader = null;
}

View File

@ -63,7 +63,7 @@ public class ZkIndexSchemaReader {
}
log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event);
try {
updateSchema(this);
updateSchema(this, -1);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
@ -89,19 +89,26 @@ public class ZkIndexSchemaReader {
}
}
private void updateSchema(Watcher watcher) throws KeeperException, InterruptedException {
public ManagedIndexSchema refreshSchemaFromZk(int expectedZkVersion) throws KeeperException, InterruptedException {
updateSchema(null, expectedZkVersion);
return managedIndexSchemaFactory.getSchema();
}
private void updateSchema(Watcher watcher, int expectedZkVersion) throws KeeperException, InterruptedException {
Stat stat = new Stat();
synchronized (getSchemaUpdateLock()) {
final ManagedIndexSchema oldSchema = managedIndexSchemaFactory.getSchema();
byte[] data = zkClient.getData(managedSchemaPath, watcher, stat, true);
if (stat.getVersion() != oldSchema.schemaZkVersion) {
log.info("Retrieved schema from ZooKeeper");
long start = System.nanoTime();
InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
ManagedIndexSchema newSchema = oldSchema.reloadFields(inputSource, stat.getVersion());
managedIndexSchemaFactory.setSchema(newSchema);
long stop = System.nanoTime();
log.info("Finished refreshing schema in " + TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS) + " ms");
if (expectedZkVersion == -1 || oldSchema.schemaZkVersion < expectedZkVersion) {
byte[] data = zkClient.getData(managedSchemaPath, watcher, stat, true);
if (stat.getVersion() != oldSchema.schemaZkVersion) {
log.info("Retrieved schema version "+stat.getVersion()+" from ZooKeeper");
long start = System.nanoTime();
InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
ManagedIndexSchema newSchema = oldSchema.reloadFields(inputSource, stat.getVersion());
managedIndexSchemaFactory.setSchema(newSchema);
long stop = System.nanoTime();
log.info("Finished refreshing schema in " + TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS) + " ms");
}
}
}
}

View File

@ -19,6 +19,12 @@ package org.apache.solr.schema;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.util.BaseTestHarness;
import org.apache.solr.util.RESTfulServerProvider;
import org.apache.solr.util.RestTestHarness;
@ -27,6 +33,8 @@ import org.restlet.ext.servlet.ServerServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.data.Stat;
import org.junit.BeforeClass;
import java.util.ArrayList;
@ -96,51 +104,65 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
}
private static void addFieldPut(RestTestHarness publisher, String fieldName) throws Exception {
private static void addFieldPut(RestTestHarness publisher, String fieldName, int updateTimeoutSecs) throws Exception {
final String content = "{\"type\":\"text\",\"stored\":\"false\"}";
String request = "/schema/fields/" + fieldName + "?wt=xml";
if (updateTimeoutSecs > 0)
request += "&updateTimeoutSecs="+updateTimeoutSecs;
String response = publisher.put(request, content);
verifySuccess(request, response);
}
private static void addFieldPost(RestTestHarness publisher, String fieldName) throws Exception {
private static void addFieldPost(RestTestHarness publisher, String fieldName, int updateTimeoutSecs) throws Exception {
final String content = "[{\"name\":\""+fieldName+"\",\"type\":\"text\",\"stored\":\"false\"}]";
String request = "/schema/fields/?wt=xml";
if (updateTimeoutSecs > 0)
request += "&updateTimeoutSecs="+updateTimeoutSecs;
String response = publisher.post(request, content);
verifySuccess(request, response);
}
private static void addDynamicFieldPut(RestTestHarness publisher, String dynamicFieldPattern) throws Exception {
private static void addDynamicFieldPut(RestTestHarness publisher, String dynamicFieldPattern, int updateTimeoutSecs) throws Exception {
final String content = "{\"type\":\"text\",\"stored\":\"false\"}";
String request = "/schema/dynamicfields/" + dynamicFieldPattern + "?wt=xml";
if (updateTimeoutSecs > 0)
request += "&updateTimeoutSecs="+updateTimeoutSecs;
String response = publisher.put(request, content);
verifySuccess(request, response);
}
private static void addDynamicFieldPost(RestTestHarness publisher, String dynamicFieldPattern) throws Exception {
private static void addDynamicFieldPost(RestTestHarness publisher, String dynamicFieldPattern, int updateTimeoutSecs) throws Exception {
final String content = "[{\"name\":\""+dynamicFieldPattern+"\",\"type\":\"text\",\"stored\":\"false\"}]";
String request = "/schema/dynamicfields/?wt=xml";
if (updateTimeoutSecs > 0)
request += "&updateTimeoutSecs="+updateTimeoutSecs;
String response = publisher.post(request, content);
verifySuccess(request, response);
}
private static void copyField(RestTestHarness publisher, String source, String dest) throws Exception {
private static void copyField(RestTestHarness publisher, String source, String dest, int updateTimeoutSecs) throws Exception {
final String content = "[{\"source\":\""+source+"\",\"dest\":[\""+dest+"\"]}]";
String request = "/schema/copyfields/?wt=xml";
if (updateTimeoutSecs > 0)
request += "&updateTimeoutSecs="+updateTimeoutSecs;
String response = publisher.post(request, content);
verifySuccess(request, response);
}
private static void addFieldTypePut(RestTestHarness publisher, String typeName) throws Exception {
private static void addFieldTypePut(RestTestHarness publisher, String typeName, int updateTimeoutSecs) throws Exception {
final String content = "{\"class\":\"solr.TrieIntField\"}";
String request = "/schema/fieldtypes/" + typeName + "?wt=xml";
if (updateTimeoutSecs > 0)
request += "&updateTimeoutSecs="+updateTimeoutSecs;
String response = publisher.put(request, content);
verifySuccess(request, response);
}
private static void addFieldTypePost(RestTestHarness publisher, String typeName) throws Exception {
private static void addFieldTypePost(RestTestHarness publisher, String typeName, int updateTimeoutSecs) throws Exception {
final String content = "[{\"name\":\""+typeName+"\",\"class\":\"solr.TrieIntField\"}]";
String request = "/schema/fieldtypes/?wt=xml";
if (updateTimeoutSecs > 0)
request += "&updateTimeoutSecs="+updateTimeoutSecs;
String response = publisher.post(request, content);
verifySuccess(request, response);
}
@ -220,6 +242,7 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
@Override
public void doTest() throws Exception {
verifyWaitForSchemaUpdateToPropagate();
setupHarnesses();
concurrentOperationsTest();
schemaLockTest();
@ -244,23 +267,23 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
PUT_AddField {
@Override public void execute(RestTestHarness publisher, int fieldNum, Info info) throws Exception {
String fieldname = PUT_FIELDNAME + info.numAddFieldPuts++;
addFieldPut(publisher, fieldname);
addFieldPut(publisher, fieldname, 15);
}
},
POST_AddField {
@Override public void execute(RestTestHarness publisher, int fieldNum, Info info) throws Exception {
String fieldname = POST_FIELDNAME + info.numAddFieldPosts++;
addFieldPost(publisher, fieldname);
addFieldPost(publisher, fieldname, 15);
}
},
PUT_AddDynamicField {
@Override public void execute(RestTestHarness publisher, int fieldNum, Info info) throws Exception {
addDynamicFieldPut(publisher, PUT_DYNAMIC_FIELDNAME + info.numAddDynamicFieldPuts++ + "_*");
addDynamicFieldPut(publisher, PUT_DYNAMIC_FIELDNAME + info.numAddDynamicFieldPuts++ + "_*", 15);
}
},
POST_AddDynamicField {
@Override public void execute(RestTestHarness publisher, int fieldNum, Info info) throws Exception {
addDynamicFieldPost(publisher, POST_DYNAMIC_FIELDNAME + info.numAddDynamicFieldPosts++ + "_*");
addDynamicFieldPost(publisher, POST_DYNAMIC_FIELDNAME + info.numAddDynamicFieldPosts++ + "_*", 15);
}
},
POST_AddCopyField {
@ -273,7 +296,7 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
sourceField = "name";
} else if (sourceType == 1) { // newly created
sourceField = "copySource" + fieldNum;
addFieldPut(publisher, sourceField);
addFieldPut(publisher, sourceField, 15);
} else { // dynamic
sourceField = "*_dynamicSource" + fieldNum + "_t";
// * only supported if both src and dst use it
@ -286,23 +309,23 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
destField = "title";
} else { // newly created
destField = "copyDest" + fieldNum;
addFieldPut(publisher, destField);
addFieldPut(publisher, destField, 15);
}
}
copyField(publisher, sourceField, destField);
copyField(publisher, sourceField, destField, 15);
info.copyFields.add(new CopyFieldInfo(sourceField, destField));
}
},
PUT_AddFieldType {
@Override public void execute(RestTestHarness publisher, int fieldNum, Info info) throws Exception {
String typeName = PUT_FIELDTYPE + info.numAddFieldTypePuts++;
addFieldTypePut(publisher, typeName);
addFieldTypePut(publisher, typeName, 15);
}
},
POST_AddFieldType {
@Override public void execute(RestTestHarness publisher, int fieldNum, Info info) throws Exception {
String typeName = POST_FIELDTYPE + info.numAddFieldTypePosts++;
addFieldTypePost(publisher, typeName);
addFieldTypePost(publisher, typeName, 15);
}
};
@ -315,6 +338,42 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
}
private void verifyWaitForSchemaUpdateToPropagate() throws Exception {
String testCollectionName = "collection1";
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Replica shard1Leader = clusterState.getLeader(testCollectionName, "shard1");
final String coreUrl = (new ZkCoreNodeProps(shard1Leader)).getCoreUrl();
assertNotNull(coreUrl);
RestTestHarness harness = new RestTestHarness(new RESTfulServerProvider() {
public String getBaseURL() {
return coreUrl.endsWith("/") ? coreUrl.substring(0, coreUrl.length()-1) : coreUrl;
}
});
addFieldTypePut(harness, "fooInt", 15);
// go into ZK to get the version of the managed schema after the update
SolrZkClient zkClient = cloudClient.getZkStateReader().getZkClient();
Stat stat = new Stat();
zkClient.getData("/configs/conf1/managed-schema", null, stat, false);
final int schemaZkVersion = stat.getVersion();
// now loop over all replicas and verify each has the same schema version
for (Slice slice : clusterState.getActiveSlices(testCollectionName)) {
for (Replica replica : slice.getReplicas()) {
final String replicaUrl = (new ZkCoreNodeProps(replica)).getCoreUrl();
RestTestHarness testHarness = new RestTestHarness(new RESTfulServerProvider() {
public String getBaseURL() {
return replicaUrl.endsWith("/") ? replicaUrl.substring(0, replicaUrl.length()-1) : replicaUrl;
}
});
testHarness.validateQuery("/schema/zkversion?wt=xml", "//zkversion="+schemaZkVersion);
}
}
}
private void concurrentOperationsTest() throws Exception {
// First, add a bunch of fields and dynamic fields via PUT and POST, as well as copyFields,
@ -405,7 +464,9 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
public void run() {
try {
addFieldPut(harness, fieldName);
// don't have the client side wait for all replicas to see the update or that defeats the purpose
// of testing the locking support on the server-side
addFieldPut(harness, fieldName, -1);
} catch (Exception e) {
// log.error("###ACTUAL FAILURE!");
throw new RuntimeException(e);
@ -420,7 +481,7 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
public void run() {
try {
addFieldPost(harness, fieldName);
addFieldPost(harness, fieldName, -1);
} catch (Exception e) {
// log.error("###ACTUAL FAILURE!");
throw new RuntimeException(e);
@ -435,7 +496,7 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
public void run() {
try {
addFieldTypePut(harness, fieldName);
addFieldTypePut(harness, fieldName, -1);
} catch (Exception e) {
// log.error("###ACTUAL FAILURE!");
throw new RuntimeException(e);
@ -450,7 +511,7 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
public void run() {
try {
addFieldTypePost(harness, fieldName);
addFieldTypePost(harness, fieldName, -1);
} catch (Exception e) {
// log.error("###ACTUAL FAILURE!");
throw new RuntimeException(e);
@ -465,7 +526,7 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
public void run() {
try {
addFieldPut(harness, fieldName);
addFieldPut(harness, fieldName, -1);
} catch (Exception e) {
// log.error("###ACTUAL FAILURE!");
throw new RuntimeException(e);
@ -480,7 +541,7 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
}
public void run() {
try {
addFieldPost(harness, fieldName);
addFieldPost(harness, fieldName, -1);
} catch (Exception e) {
// log.error("###ACTUAL FAILURE!");
throw new RuntimeException(e);