diff --git a/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java b/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java deleted file mode 100644 index 29d07511cee..00000000000 --- a/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.cloud; - -import java.util.List; - -import org.apache.solr.core.CoreDescriptor; - -/** - * Provide the current list of registered {@link CoreDescriptor}s. - */ -public abstract class CurrentCoreDescriptorProvider { - public abstract List getCurrentDescriptors(); -} diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index d2ec34bc203..843236a9163 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -48,6 +48,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import com.google.common.base.Strings; import org.apache.commons.lang3.StringUtils; @@ -287,7 +288,14 @@ public class ZkController implements Closeable { } } - public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect) + /** + * @param cc Core container associated with this controller. cannot be null. + * @param zkServerAddress where to connect to the zk server + * @param zkClientConnectTimeout timeout in ms + * @param cloudConfig configuration for this controller. TODO: possibly redundant with CoreContainer + * @param descriptorsSupplier a supplier of the current core descriptors. used to know which cores to re-register on reconnect + */ + public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final Supplier> descriptorsSupplier) throws InterruptedException, TimeoutException, IOException { if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null."); @@ -364,13 +372,14 @@ public class ZkController implements Closeable { } overseerElector.setup(context); + overseerElector.joinElection(context, true); } cc.cancelCoreRecoveries(); try { - registerAllCoresAsDown(registerOnReconnect, false); + registerAllCoresAsDown(descriptorsSupplier, false); } catch (SessionExpiredException e) { // zk has to reconnect and this will all be tried again throw e; @@ -383,7 +392,7 @@ public class ZkController implements Closeable { // we have to register as live first to pick up docs in the buffer createEphemeralLiveNode(); - List descriptors = registerOnReconnect.getCurrentDescriptors(); + List descriptors = descriptorsSupplier.get(); // re register all descriptors ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null; if (descriptors != null) { @@ -449,8 +458,8 @@ public class ZkController implements Closeable { } catch (Exception e) { log.error("Error trying to stop any Overseer threads", e); } - closeOutstandingElections(registerOnReconnect); - markAllAsNotLeader(registerOnReconnect); + closeOutstandingElections(descriptorsSupplier); + markAllAsNotLeader(descriptorsSupplier); } }, zkACLProvider, new ConnectionManager.IsClosed() { @@ -469,7 +478,7 @@ public class ZkController implements Closeable { if (cc != null) cc.securityNodeChanged(); }); - init(registerOnReconnect); + init(); this.overseerJobQueue = overseer.getStateUpdateQueue(); this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient); @@ -489,9 +498,8 @@ public class ZkController implements Closeable { } private void registerAllCoresAsDown( - final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) throws SessionExpiredException { - List descriptors = registerOnReconnect - .getCurrentDescriptors(); + final Supplier> registerOnReconnect, boolean updateLastPublished) throws SessionExpiredException { + List descriptors = registerOnReconnect.get(); if (isClosed) return; if (descriptors != null) { // before registering as live, make sure everyone is in a @@ -546,9 +554,8 @@ public class ZkController implements Closeable { return sysPropsCacher; } - private void closeOutstandingElections(final CurrentCoreDescriptorProvider registerOnReconnect) { - - List descriptors = registerOnReconnect.getCurrentDescriptors(); + private void closeOutstandingElections(final Supplier> registerOnReconnect) { + List descriptors = registerOnReconnect.get(); if (descriptors != null) { for (CoreDescriptor descriptor : descriptors) { closeExistingElectionContext(descriptor); @@ -572,10 +579,8 @@ public class ZkController implements Closeable { return contextKey; } - private void markAllAsNotLeader( - final CurrentCoreDescriptorProvider registerOnReconnect) { - List descriptors = registerOnReconnect - .getCurrentDescriptors(); + private void markAllAsNotLeader(final Supplier> registerOnReconnect) { + List descriptors = registerOnReconnect.get(); if (descriptors != null) { for (CoreDescriptor descriptor : descriptors) { descriptor.getCloudDescriptor().setLeader(false); @@ -772,7 +777,7 @@ public class ZkController implements Closeable { // normalize host removing any url scheme. // input can be null, host, or url_prefix://host - private String normalizeHostName(String host) throws IOException { + private String normalizeHostName(String host) { if (host == null || host.length() == 0) { String hostaddress; @@ -895,8 +900,7 @@ public class ZkController implements Closeable { return configDirPath; } - private void init(CurrentCoreDescriptorProvider registerOnReconnect) { - + private void init() { try { createClusterZkNodes(zkClient); zkStateReader.createClusterStateWatchersAndUpdate(); diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index d4665a0a05e..e866869e1f8 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -28,8 +28,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; +import java.util.function.Supplier; -import org.apache.solr.cloud.CurrentCoreDescriptorProvider; import org.apache.solr.cloud.SolrZkServer; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.AlreadyClosedException; @@ -112,21 +112,17 @@ public class ZkContainer { throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A chroot was specified in ZkHost but the znode doesn't exist. " + zookeeperHost); } - ZkController zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config, - new CurrentCoreDescriptorProvider() { - @Override - public List getCurrentDescriptors() { - List descriptors = new ArrayList<>( - cc.getLoadedCoreNames().size()); - Collection cores = cc.getCores(); - for (SolrCore core : cores) { - descriptors.add(core.getCoreDescriptor()); - } - return descriptors; - } - }); + Supplier> descriptorsSupplier = () -> { + List descriptors = new ArrayList<>(cc.getLoadedCoreNames().size()); + Collection cores = cc.getCores(); + for (SolrCore core : cores) { + descriptors.add(core.getCoreDescriptor()); + } + return descriptors; + }; + ZkController zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config, descriptorsSupplier); if (zkRun != null && zkServer.getServers().size() > 1 && confDir == null && boostrapConf == false) { // we are part of an ensemble and we are not uploading the config - pause to give the config time diff --git a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java index 39650f28257..c33ed011eb0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java +++ b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java @@ -17,15 +17,18 @@ package org.apache.solr.cloud; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import org.apache.solr.core.CloudConfig; import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; public class MockSimpleZkController extends ZkController { public MockSimpleZkController(CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, - CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException, TimeoutException, IOException { + Supplier> registerOnReconnect) throws InterruptedException, TimeoutException, IOException { super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig, registerOnReconnect); } diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java index 6bbfe1cfe14..c085b2876a8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java @@ -19,14 +19,12 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; import java.nio.file.Path; import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.core.CloudConfig; import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.CoreDescriptor; import org.apache.zookeeper.KeeperException; import org.junit.Test; import org.slf4j.Logger; @@ -55,12 +53,7 @@ public class TestLeaderElectionZkExpiry extends SolrTestCaseJ4 { .setLeaderConflictResolveWait(180000) .setLeaderVoteWait(180000) .build(); - final ZkController zkController = new ZkController(cc, server.getZkAddress(), 15000, cloudConfig, new CurrentCoreDescriptorProvider() { - @Override - public List getCurrentDescriptors() { - return Collections.EMPTY_LIST; - } - }); + final ZkController zkController = new ZkController(cc, server.getZkAddress(), 15000, cloudConfig, () -> Collections.emptyList()); try { Thread killer = new Thread() { @Override diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index 39b33188eeb..43a75408693 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -197,15 +197,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 { cc = getCoreContainer(); CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build(); - ZkController zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, - new CurrentCoreDescriptorProvider() { - - @Override - public List getCurrentDescriptors() { - // do nothing - return null; - } - }); + ZkController zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null); try { String configName = zkController.getZkStateReader().readConfigName(COLLECTION_NAME); assertEquals(configName, actualConfigName); @@ -234,14 +226,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 { try { CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build(); - zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() { - - @Override - public List getCurrentDescriptors() { - // do nothing - return null; - } - }); + zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null); } catch (IllegalArgumentException e) { fail("ZkController did not normalize host name correctly"); } finally { @@ -295,14 +280,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 { try { CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build(); - zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() { - - @Override - public List getCurrentDescriptors() { - // do nothing - return null; - } - }); + zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null); zkControllerRef.set(zkController); zkController.getZkClient().makePath(ZkStateReader.getCollectionPathRoot(collectionName), new byte[0], CreateMode.PERSISTENT, true);