SOLR-9847: Stop blocking further schema updates while waiting for a pending update to propagate to other replicas. This reduces the likelihood of a (time-limited) distributed deadlock during concurrent schema updates.

This commit is contained in:
Steve Rowe 2016-12-20 12:05:33 -05:00
parent b37a72d941
commit 04108d9935
2 changed files with 59 additions and 44 deletions

View File

@ -271,6 +271,10 @@ Bug Fixes
* SOLR-1953: It may be possible for temporary files to accumulate until the Solr process is shut down. * SOLR-1953: It may be possible for temporary files to accumulate until the Solr process is shut down.
(Karl Wright, Mark Miller) (Karl Wright, Mark Miller)
* SOLR-9847: Stop blocking further schema updates while waiting for a pending update to propagate to other replicas.
This reduces the likelihood of a (time-limited) distributed deadlock during concurrent schema updates.
(Mark Miller, Steve Rowe)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -88,9 +88,7 @@ public class SchemaManager {
IndexSchema schema = req.getCore().getLatestSchema(); IndexSchema schema = req.getCore().getLatestSchema();
if (schema instanceof ManagedIndexSchema && schema.isMutable()) { if (schema instanceof ManagedIndexSchema && schema.isMutable()) {
synchronized (schema.getSchemaUpdateLock()) { return doOperations(ops);
return doOperations(ops);
}
} else { } else {
return singletonList(singletonMap(CommandOperation.ERR_MSGS, "schema is not editable")); return singletonList(singletonMap(CommandOperation.ERR_MSGS, "schema is not editable"));
} }
@ -107,52 +105,65 @@ public class SchemaManager {
TimeOut timeOut = new TimeOut(timeout, TimeUnit.SECONDS); TimeOut timeOut = new TimeOut(timeout, TimeUnit.SECONDS);
SolrCore core = req.getCore(); SolrCore core = req.getCore();
String errorMsg = "Unable to persist managed schema. "; String errorMsg = "Unable to persist managed schema. ";
while (!timeOut.hasTimedOut()) { List errors = Collections.emptyList();
managedIndexSchema = getFreshManagedSchema(req.getCore()); int latestVersion = -1;
for (CommandOperation op : operations) {
OpType opType = OpType.get(op.name);
if (opType != null) {
opType.perform(op, this);
} else {
op.addError("No such operation : " + op.name);
}
}
List errs = CommandOperation.captureErrors(operations);
if (!errs.isEmpty()) return errs;
SolrResourceLoader loader = req.getCore().getResourceLoader();
if (loader instanceof ZkSolrResourceLoader) {
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) loader;
StringWriter sw = new StringWriter();
try {
managedIndexSchema.persist(sw);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to serialize schema");
//unlikely
}
try { synchronized (req.getSchema().getSchemaUpdateLock()) {
int latestVersion = ZkController.persistConfigResourceToZooKeeper(zkLoader, managedIndexSchema.getSchemaZkVersion(), while (!timeOut.hasTimedOut()) {
managedIndexSchema.getResourceName(), sw.toString().getBytes(StandardCharsets.UTF_8), true); managedIndexSchema = getFreshManagedSchema(req.getCore());
req.getCore().getCoreDescriptor().getCoreContainer().reload(req.getCore().getName()); for (CommandOperation op : operations) {
waitForOtherReplicasToUpdate(timeOut, latestVersion); OpType opType = OpType.get(op.name);
return Collections.emptyList(); if (opType != null) {
} catch (ZkController.ResourceModifiedInZkException e) { opType.perform(op, this);
log.info("Schema was modified by another node. Retrying.."); } else {
op.addError("No such operation : " + op.name);
}
} }
} else { errors = CommandOperation.captureErrors(operations);
try { if (!errors.isEmpty()) break;
//only for non cloud stuff SolrResourceLoader loader = req.getCore().getResourceLoader();
managedIndexSchema.persistManagedSchema(false); if (loader instanceof ZkSolrResourceLoader) {
core.setLatestSchema(managedIndexSchema); ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) loader;
return Collections.emptyList(); StringWriter sw = new StringWriter();
} catch (SolrException e) { try {
log.warn(errorMsg); managedIndexSchema.persist(sw);
return singletonList(errorMsg + e.getMessage()); } catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to serialize schema");
//unlikely
}
try {
latestVersion = ZkController.persistConfigResourceToZooKeeper
(zkLoader, managedIndexSchema.getSchemaZkVersion(), managedIndexSchema.getResourceName(),
sw.toString().getBytes(StandardCharsets.UTF_8), true);
req.getCore().getCoreDescriptor().getCoreContainer().reload(req.getCore().getName());
break;
} catch (ZkController.ResourceModifiedInZkException e) {
log.info("Schema was modified by another node. Retrying..");
}
} else {
try {
//only for non cloud stuff
managedIndexSchema.persistManagedSchema(false);
core.setLatestSchema(managedIndexSchema);
} catch (SolrException e) {
log.warn(errorMsg);
errors = singletonList(errorMsg + e.getMessage());
}
break;
} }
} }
} }
log.warn(errorMsg + "Timed out."); if (req.getCore().getResourceLoader() instanceof ZkSolrResourceLoader) {
return singletonList(errorMsg + "Timed out."); // Don't block further schema updates while waiting for a pending update to propagate to other replicas.
// This reduces the likelihood of a (time-limited) distributed deadlock during concurrent schema updates.
waitForOtherReplicasToUpdate(timeOut, latestVersion);
}
if (errors.isEmpty() && timeOut.hasTimedOut()) {
log.warn(errorMsg + "Timed out.");
errors = singletonList(errorMsg + "Timed out.");
}
return errors;
} }
private void waitForOtherReplicasToUpdate(TimeOut timeOut, int latestVersion) { private void waitForOtherReplicasToUpdate(TimeOut timeOut, int latestVersion) {