mirror of https://github.com/apache/lucene.git
SOLR-6249: support re-establishing a new watcher on the managed schema znode after zk session expiration.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1629229 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
73ec0f28cc
commit
c44e4ee0ff
|
@ -189,6 +189,9 @@ public final class ZkController {
|
|||
// and interact with zookeeper via the Solr admin UI on a node outside the cluster,
|
||||
// and so one that will not be killed or stopped when testing. See developer cloud-scripts.
|
||||
private boolean zkRunOnly = Boolean.getBoolean("zkRunOnly"); // expert
|
||||
|
||||
// keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
|
||||
private List<OnReconnect> reconnectListeners = new ArrayList<OnReconnect>();
|
||||
|
||||
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
|
||||
String localHostContext, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect)
|
||||
|
@ -239,8 +242,9 @@ public final class ZkController {
|
|||
|
||||
@Override
|
||||
public void command() {
|
||||
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
|
||||
|
||||
try {
|
||||
|
||||
// this is troublesome - we dont want to kill anything the old
|
||||
// leader accepted
|
||||
// though I guess sync will likely get those updates back? But
|
||||
|
@ -294,7 +298,19 @@ public final class ZkController {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// notify any other objects that need to know when the session was re-connected
|
||||
synchronized (reconnectListeners) {
|
||||
for (OnReconnect listener : reconnectListeners) {
|
||||
try {
|
||||
listener.command();
|
||||
} catch (Exception exc) {
|
||||
// not much we can do here other than warn in the log
|
||||
log.warn("Error when notifying OnReconnect listener "+listener+" after session re-connected.", exc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -2013,4 +2029,16 @@ public final class ZkController {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a listener to be notified once there is a new session created after a ZooKeeper session expiration occurs;
|
||||
* in most cases, listeners will be components that have watchers that need to be re-created.
|
||||
*/
|
||||
public void addOnReconnectListener(OnReconnect listener) {
|
||||
if (listener != null) {
|
||||
synchronized (reconnectListeners) {
|
||||
reconnectListeners.add(listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,8 @@ public class SchemaZkVersionResource extends BaseSolrResource implements GETable
|
|||
ManagedIndexSchema managed = (ManagedIndexSchema)schema;
|
||||
zkVersion = managed.getSchemaZkVersion();
|
||||
if (refreshIfBelowVersion != -1 && zkVersion < refreshIfBelowVersion) {
|
||||
log.info("\n\n\n REFRESHING SCHEMA (refreshIfBelowVersion="+refreshIfBelowVersion+") before returning version! \n\n\n");
|
||||
log.info("REFRESHING SCHEMA (refreshIfBelowVersion="+refreshIfBelowVersion+
|
||||
", currentVersion="+zkVersion+") before returning version!");
|
||||
ZkSolrResourceLoader zkSolrResourceLoader = (ZkSolrResourceLoader)getSolrCore().getResourceLoader();
|
||||
ZkIndexSchemaReader zkIndexSchemaReader = zkSolrResourceLoader.getZkIndexSchemaReader();
|
||||
managed = zkIndexSchemaReader.refreshSchemaFromZk(refreshIfBelowVersion);
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.schema;
|
|||
|
||||
import org.apache.solr.cloud.ZkSolrResourceLoader;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.OnReconnect;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -32,7 +33,7 @@ import java.io.ByteArrayInputStream;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** Keeps a ManagedIndexSchema up-to-date when changes are made to the serialized managed schema in ZooKeeper */
|
||||
public class ZkIndexSchemaReader {
|
||||
public class ZkIndexSchemaReader implements OnReconnect {
|
||||
private static final Logger log = LoggerFactory.getLogger(ZkIndexSchemaReader.class);
|
||||
private final ManagedIndexSchemaFactory managedIndexSchemaFactory;
|
||||
private SolrZkClient zkClient;
|
||||
|
@ -44,6 +45,7 @@ public class ZkIndexSchemaReader {
|
|||
this.zkClient = zkLoader.getZkController().getZkClient();
|
||||
managedSchemaPath = zkLoader.getCollectionZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
|
||||
createSchemaWatcher();
|
||||
zkLoader.getZkController().addOnReconnectListener(this);
|
||||
}
|
||||
|
||||
public Object getSchemaUpdateLock() {
|
||||
|
@ -51,7 +53,7 @@ public class ZkIndexSchemaReader {
|
|||
}
|
||||
|
||||
public void createSchemaWatcher() {
|
||||
log.info("Creating ZooKeeper watch for the managed schema at " + managedSchemaPath + " ...");
|
||||
log.info("Creating ZooKeeper watch for the managed schema at " + managedSchemaPath);
|
||||
|
||||
try {
|
||||
zkClient.exists(managedSchemaPath, new Watcher() {
|
||||
|
@ -108,8 +110,26 @@ public class ZkIndexSchemaReader {
|
|||
managedIndexSchemaFactory.setSchema(newSchema);
|
||||
long stop = System.nanoTime();
|
||||
log.info("Finished refreshing schema in " + TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS) + " ms");
|
||||
} else {
|
||||
log.info("Current schema version "+oldSchema.schemaZkVersion+" is already the latest");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after a ZooKeeper session expiration occurs; need to re-create the watcher and update the current
|
||||
* schema from ZooKeeper.
|
||||
*/
|
||||
@Override
|
||||
public void command() {
|
||||
try {
|
||||
// setup a new watcher to get notified when the managed schema changes
|
||||
createSchemaWatcher();
|
||||
// force update now as the schema may have changed while our zk session was expired
|
||||
updateSchema(null, -1);
|
||||
} catch (Exception exc) {
|
||||
log.error("Failed to update managed-schema watcher after session expiration due to: "+exc, exc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.apache.solr.schema;
|
|||
*/
|
||||
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -357,19 +358,72 @@ public class TestCloudManagedSchemaConcurrent extends AbstractFullDistribZkTestB
|
|||
// go into ZK to get the version of the managed schema after the update
|
||||
SolrZkClient zkClient = cloudClient.getZkStateReader().getZkClient();
|
||||
Stat stat = new Stat();
|
||||
zkClient.getData("/configs/conf1/managed-schema", null, stat, false);
|
||||
final int schemaZkVersion = stat.getVersion();
|
||||
String znodePath = "/configs/conf1/managed-schema";
|
||||
byte[] managedSchemaBytes = zkClient.getData(znodePath, null, stat, false);
|
||||
int schemaZkVersion = stat.getVersion();
|
||||
|
||||
// now loop over all replicas and verify each has the same schema version
|
||||
Replica randomReplicaNotLeader = null;
|
||||
for (Slice slice : clusterState.getActiveSlices(testCollectionName)) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
final String replicaUrl = (new ZkCoreNodeProps(replica)).getCoreUrl();
|
||||
RestTestHarness testHarness = new RestTestHarness(new RESTfulServerProvider() {
|
||||
public String getBaseURL() {
|
||||
return replicaUrl.endsWith("/") ? replicaUrl.substring(0, replicaUrl.length()-1) : replicaUrl;
|
||||
}
|
||||
});
|
||||
testHarness.validateQuery("/schema/zkversion?wt=xml", "//zkversion="+schemaZkVersion);
|
||||
validateZkVersion(replica, schemaZkVersion, 0, false);
|
||||
|
||||
// save a random replica to test zk watcher behavior
|
||||
if (randomReplicaNotLeader == null && !replica.getName().equals(shard1Leader.getName()))
|
||||
randomReplicaNotLeader = replica;
|
||||
}
|
||||
}
|
||||
assertNotNull(randomReplicaNotLeader);
|
||||
|
||||
// now update the data and then verify the znode watcher fires correctly
|
||||
// before an after a zk session expiration (see SOLR-6249)
|
||||
zkClient.setData(znodePath, managedSchemaBytes, schemaZkVersion, false);
|
||||
stat = new Stat();
|
||||
managedSchemaBytes = zkClient.getData(znodePath, null, stat, false);
|
||||
int updatedSchemaZkVersion = stat.getVersion();
|
||||
assertTrue(updatedSchemaZkVersion > schemaZkVersion);
|
||||
validateZkVersion(randomReplicaNotLeader, updatedSchemaZkVersion, 2, true);
|
||||
|
||||
// ok - looks like the watcher fired correctly on the replica
|
||||
// now, expire that replica's zk session and then verify the watcher fires again (after reconnect)
|
||||
JettySolrRunner randomReplicaJetty =
|
||||
getJettyOnPort(getReplicaPort(randomReplicaNotLeader));
|
||||
assertNotNull(randomReplicaJetty);
|
||||
chaosMonkey.expireSession(randomReplicaJetty);
|
||||
|
||||
// update the data again to cause watchers to fire
|
||||
zkClient.setData(znodePath, managedSchemaBytes, updatedSchemaZkVersion, false);
|
||||
stat = new Stat();
|
||||
managedSchemaBytes = zkClient.getData(znodePath, null, stat, false);
|
||||
updatedSchemaZkVersion = stat.getVersion();
|
||||
// give up to 10 secs for the replica to recover after zk session loss and see the update
|
||||
validateZkVersion(randomReplicaNotLeader, updatedSchemaZkVersion, 10, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a GET request to get the zk schema version from a specific replica.
|
||||
*/
|
||||
protected void validateZkVersion(Replica replica, int schemaZkVersion, int waitSecs, boolean retry) throws Exception {
|
||||
final String replicaUrl = (new ZkCoreNodeProps(replica)).getCoreUrl();
|
||||
RestTestHarness testHarness = new RestTestHarness(new RESTfulServerProvider() {
|
||||
public String getBaseURL() {
|
||||
return replicaUrl.endsWith("/") ? replicaUrl.substring(0, replicaUrl.length()-1) : replicaUrl;
|
||||
}
|
||||
});
|
||||
|
||||
long waitMs = waitSecs * 1000L;
|
||||
if (waitMs > 0) Thread.sleep(waitMs); // wait a moment for the zk watcher to fire
|
||||
|
||||
try {
|
||||
testHarness.validateQuery("/schema/zkversion?wt=xml", "//zkversion=" + schemaZkVersion);
|
||||
} catch (Exception exc) {
|
||||
if (retry) {
|
||||
// brief wait before retrying
|
||||
Thread.sleep(waitMs > 0 ? waitMs : 2000L);
|
||||
|
||||
testHarness.validateQuery("/schema/zkversion?wt=xml", "//zkversion=" + schemaZkVersion);
|
||||
} else {
|
||||
throw exc;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -549,6 +549,14 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
return jetty;
|
||||
}
|
||||
|
||||
protected int getReplicaPort(Replica replica) {
|
||||
String replicaNode = replica.getNodeName();
|
||||
String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
|
||||
if (tmp.indexOf('_') != -1)
|
||||
tmp = tmp.substring(0,tmp.indexOf('_'));
|
||||
return Integer.parseInt(tmp);
|
||||
}
|
||||
|
||||
protected JettySolrRunner getJettyOnPort(int port) {
|
||||
JettySolrRunner theJetty = null;
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
|
|
Loading…
Reference in New Issue