mirror of https://github.com/apache/lucene.git
SOLR-14192: Race condition between SchemaManager and ZkIndexSchemaReader.
This commit is contained in:
parent
bddb06b650
commit
6244b7150e
|
@ -225,6 +225,8 @@ Bug Fixes
|
|||
|
||||
* SOLR-6613: TextField.analyzeMultiTerm does not throw an exception when Analyzer returns no terms. (Bruno Roustant)
|
||||
|
||||
* SOLR-14192: Race condition between SchemaManager and ZkIndexSchemaReader. (ab)
|
||||
|
||||
Other Changes
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.solr.core.SolrCore;
|
|||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.util.SystemIdResolver;
|
||||
import org.apache.solr.util.plugin.SolrCoreAware;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -44,7 +45,7 @@ import org.xml.sax.InputSource;
|
|||
/** Factory for ManagedIndexSchema */
|
||||
public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements SolrCoreAware {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private static final String UPGRADED_SCHEMA_EXTENSION = ".bak";
|
||||
public static final String UPGRADED_SCHEMA_EXTENSION = ".bak";
|
||||
private static final String SCHEMA_DOT_XML = "schema.xml";
|
||||
|
||||
public static final String DEFAULT_MANAGED_SCHEMA_RESOURCE_NAME = "managed-schema";
|
||||
|
@ -326,9 +327,19 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
|
|||
} else {
|
||||
// Rename the non-managed schema znode in ZooKeeper
|
||||
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader;
|
||||
ZkController zkController = zkLoader.getZkController();
|
||||
SolrZkClient zkClient = zkController.getZkClient();
|
||||
final String nonManagedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + resourceName;
|
||||
final String lockPath = nonManagedSchemaPath + ".lock";
|
||||
boolean locked = false;
|
||||
try {
|
||||
ZkController zkController = zkLoader.getZkController();
|
||||
try {
|
||||
zkClient.makePath(lockPath, null, CreateMode.EPHEMERAL, null, true, true);
|
||||
locked = true;
|
||||
} catch (Exception e) {
|
||||
// some other node already started the upgrade, or an error occurred - bail out
|
||||
return;
|
||||
}
|
||||
ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(zkController.getClientTimeout());
|
||||
if (zkController.pathExists(nonManagedSchemaPath)) {
|
||||
// First, copy the non-managed schema znode content to the upgraded schema znode
|
||||
|
@ -360,6 +371,17 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
|
|||
}
|
||||
final String msg = "Error persisting managed schema resource " + managedSchemaResourceName;
|
||||
log.warn(msg, e); // Log as warning and suppress the exception
|
||||
} finally {
|
||||
if (locked) {
|
||||
// unlock
|
||||
try {
|
||||
zkClient.delete(lockPath, -1, true);
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
// ignore - someone else deleted it
|
||||
} catch (Exception e) {
|
||||
log.warn("Unable to delete schema upgrade lock file " + lockPath, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.cloud.ZkSolrResourceLoader;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
|
@ -63,9 +64,17 @@ public class SchemaManager {
|
|||
|
||||
final SolrQueryRequest req;
|
||||
ManagedIndexSchema managedIndexSchema;
|
||||
int timeout;
|
||||
|
||||
public SchemaManager(SolrQueryRequest req){
|
||||
this.req = req;
|
||||
//The default timeout is 10 minutes when no BaseSolrResource.UPDATE_TIMEOUT_SECS is specified
|
||||
timeout = req.getParams().getInt(BaseSolrResource.UPDATE_TIMEOUT_SECS, 600);
|
||||
|
||||
//If BaseSolrResource.UPDATE_TIMEOUT_SECS=0 or -1 then end time then we'll try for 10 mins ( default timeout )
|
||||
if (timeout < 1) {
|
||||
timeout = 600;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -87,13 +96,6 @@ public class SchemaManager {
|
|||
}
|
||||
|
||||
private List doOperations(List<CommandOperation> operations) throws InterruptedException, IOException, KeeperException {
|
||||
//The default timeout is 10 minutes when no BaseSolrResource.UPDATE_TIMEOUT_SECS is specified
|
||||
int timeout = req.getParams().getInt(BaseSolrResource.UPDATE_TIMEOUT_SECS, 600);
|
||||
|
||||
//If BaseSolrResource.UPDATE_TIMEOUT_SECS=0 or -1 then end time then we'll try for 10 mins ( default timeout )
|
||||
if (timeout < 1) {
|
||||
timeout = 600;
|
||||
}
|
||||
TimeOut timeOut = new TimeOut(timeout, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
SolrCore core = req.getCore();
|
||||
String errorMsg = "Unable to persist managed schema. ";
|
||||
|
@ -414,12 +416,29 @@ public class SchemaManager {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
public static ManagedIndexSchema getFreshManagedSchema(SolrCore core) throws IOException,
|
||||
private ManagedIndexSchema getFreshManagedSchema(SolrCore core) throws IOException,
|
||||
KeeperException, InterruptedException {
|
||||
|
||||
SolrResourceLoader resourceLoader = core.getResourceLoader();
|
||||
String name = core.getLatestSchema().getResourceName();
|
||||
if (resourceLoader instanceof ZkSolrResourceLoader) {
|
||||
SolrZkClient zkClient = ((ZkSolrResourceLoader) resourceLoader).getZkController().getZkClient();
|
||||
try {
|
||||
if (!zkClient.exists(name, true)) {
|
||||
String backupName = name + ManagedIndexSchemaFactory.UPGRADED_SCHEMA_EXTENSION;
|
||||
if (!zkClient.exists(backupName, true)) {
|
||||
log.warn("Unable to retrieve fresh managed schema, neither " + name + " nor " + backupName + " exist.");
|
||||
// use current schema
|
||||
return (ManagedIndexSchema) core.getLatestSchema();
|
||||
} else {
|
||||
name = backupName;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Unable to retrieve fresh managed schema " + name, e);
|
||||
// use current schema
|
||||
return (ManagedIndexSchema) core.getLatestSchema();
|
||||
}
|
||||
InputStream in = resourceLoader.openResource(name);
|
||||
if (in instanceof ZkSolrResourceLoader.ZkByteArrayInputStream) {
|
||||
int version = ((ZkSolrResourceLoader.ZkByteArrayInputStream) in).getStat().getVersion();
|
||||
|
|
Loading…
Reference in New Issue