Remove CurrentCoreDescriptorProvider (#1384)

* Remove CurrentCoreDescriptorProvider

Replace `CurrentCoreDescriptorProvider` with a functional interface so
that it is easier to construct since all implementations in our code
base were anonymous classes anyway. Added Javadocs explaining the
usage instead of relying on class name to convey information.
This commit is contained in:
Mike 2020-03-27 14:31:37 -07:00 committed by GitHub
parent 8cb50a52bc
commit ac866a67de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 41 additions and 95 deletions

View File

@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.util.List;
import org.apache.solr.core.CoreDescriptor;
/**
* Provide the current list of registered {@link CoreDescriptor}s.
*/
public abstract class CurrentCoreDescriptorProvider {
public abstract List<CoreDescriptor> getCurrentDescriptors();
}

View File

@ -48,6 +48,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
@ -287,7 +288,14 @@ public class ZkController implements Closeable {
}
}
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect)
/**
* @param cc Core container associated with this controller. cannot be null.
* @param zkServerAddress where to connect to the zk server
* @param zkClientConnectTimeout timeout in ms
* @param cloudConfig configuration for this controller. TODO: possibly redundant with CoreContainer
* @param descriptorsSupplier a supplier of the current core descriptors. used to know which cores to re-register on reconnect
*/
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final Supplier<List<CoreDescriptor>> descriptorsSupplier)
throws InterruptedException, TimeoutException, IOException {
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
@ -364,13 +372,14 @@ public class ZkController implements Closeable {
}
overseerElector.setup(context);
overseerElector.joinElection(context, true);
}
cc.cancelCoreRecoveries();
try {
registerAllCoresAsDown(registerOnReconnect, false);
registerAllCoresAsDown(descriptorsSupplier, false);
} catch (SessionExpiredException e) {
// zk has to reconnect and this will all be tried again
throw e;
@ -383,7 +392,7 @@ public class ZkController implements Closeable {
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
List<CoreDescriptor> descriptors = descriptorsSupplier.get();
// re register all descriptors
ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
if (descriptors != null) {
@ -449,8 +458,8 @@ public class ZkController implements Closeable {
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
closeOutstandingElections(registerOnReconnect);
markAllAsNotLeader(registerOnReconnect);
closeOutstandingElections(descriptorsSupplier);
markAllAsNotLeader(descriptorsSupplier);
}
}, zkACLProvider, new ConnectionManager.IsClosed() {
@ -469,7 +478,7 @@ public class ZkController implements Closeable {
if (cc != null) cc.securityNodeChanged();
});
init(registerOnReconnect);
init();
this.overseerJobQueue = overseer.getStateUpdateQueue();
this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
@ -489,9 +498,8 @@ public class ZkController implements Closeable {
}
private void registerAllCoresAsDown(
final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) throws SessionExpiredException {
List<CoreDescriptor> descriptors = registerOnReconnect
.getCurrentDescriptors();
final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean updateLastPublished) throws SessionExpiredException {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (isClosed) return;
if (descriptors != null) {
// before registering as live, make sure everyone is in a
@ -546,9 +554,8 @@ public class ZkController implements Closeable {
return sysPropsCacher;
}
private void closeOutstandingElections(final CurrentCoreDescriptorProvider registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
private void closeOutstandingElections(final Supplier<List<CoreDescriptor>> registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
closeExistingElectionContext(descriptor);
@ -572,10 +579,8 @@ public class ZkController implements Closeable {
return contextKey;
}
private void markAllAsNotLeader(
final CurrentCoreDescriptorProvider registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect
.getCurrentDescriptors();
private void markAllAsNotLeader(final Supplier<List<CoreDescriptor>> registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
descriptor.getCloudDescriptor().setLeader(false);
@ -772,7 +777,7 @@ public class ZkController implements Closeable {
// normalize host removing any url scheme.
// input can be null, host, or url_prefix://host
private String normalizeHostName(String host) throws IOException {
private String normalizeHostName(String host) {
if (host == null || host.length() == 0) {
String hostaddress;
@ -895,8 +900,7 @@ public class ZkController implements Closeable {
return configDirPath;
}
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
private void init() {
try {
createClusterZkNodes(zkClient);
zkStateReader.createClusterStateWatchersAndUpdate();

View File

@ -28,8 +28,8 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
import org.apache.solr.cloud.SolrZkServer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.AlreadyClosedException;
@ -112,21 +112,17 @@ public class ZkContainer {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"A chroot was specified in ZkHost but the znode doesn't exist. " + zookeeperHost);
}
ZkController zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config,
new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
List<CoreDescriptor> descriptors = new ArrayList<>(
cc.getLoadedCoreNames().size());
Collection<SolrCore> cores = cc.getCores();
for (SolrCore core : cores) {
descriptors.add(core.getCoreDescriptor());
}
return descriptors;
}
});
Supplier<List<CoreDescriptor>> descriptorsSupplier = () -> {
List<CoreDescriptor> descriptors = new ArrayList<>(cc.getLoadedCoreNames().size());
Collection<SolrCore> cores = cc.getCores();
for (SolrCore core : cores) {
descriptors.add(core.getCoreDescriptor());
}
return descriptors;
};
ZkController zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config, descriptorsSupplier);
if (zkRun != null && zkServer.getServers().size() > 1 && confDir == null && boostrapConf == false) {
// we are part of an ensemble and we are not uploading the config - pause to give the config time

View File

@ -17,15 +17,18 @@
package org.apache.solr.cloud;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
public class MockSimpleZkController extends ZkController {
public MockSimpleZkController(CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig,
CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException, TimeoutException, IOException {
Supplier<List<CoreDescriptor>> registerOnReconnect) throws InterruptedException, TimeoutException, IOException {
super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig, registerOnReconnect);
}

View File

@ -19,14 +19,12 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.slf4j.Logger;
@ -55,12 +53,7 @@ public class TestLeaderElectionZkExpiry extends SolrTestCaseJ4 {
.setLeaderConflictResolveWait(180000)
.setLeaderVoteWait(180000)
.build();
final ZkController zkController = new ZkController(cc, server.getZkAddress(), 15000, cloudConfig, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
return Collections.EMPTY_LIST;
}
});
final ZkController zkController = new ZkController(cc, server.getZkAddress(), 15000, cloudConfig, () -> Collections.emptyList());
try {
Thread killer = new Thread() {
@Override

View File

@ -197,15 +197,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
cc = getCoreContainer();
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
ZkController zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig,
new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
// do nothing
return null;
}
});
ZkController zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null);
try {
String configName = zkController.getZkStateReader().readConfigName(COLLECTION_NAME);
assertEquals(configName, actualConfigName);
@ -234,14 +226,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
try {
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
// do nothing
return null;
}
});
zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null);
} catch (IllegalArgumentException e) {
fail("ZkController did not normalize host name correctly");
} finally {
@ -295,14 +280,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
try {
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
// do nothing
return null;
}
});
zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null);
zkControllerRef.set(zkController);
zkController.getZkClient().makePath(ZkStateReader.getCollectionPathRoot(collectionName), new byte[0], CreateMode.PERSISTENT, true);