mirror of https://github.com/apache/lucene.git
SOLR-14680: Provide an implementation for the new SolrCluster API (#1730)
This commit is contained in:
parent
a6515ca38f
commit
d517361bb1
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cluster.api.CollectionConfig;
|
||||
import org.apache.solr.cluster.api.SimpleMap;
|
||||
import org.apache.solr.cluster.api.SolrCollection;
|
||||
import org.apache.solr.common.LazySolrCluster;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public class TestLazySolrCluster extends SolrCloudTestCase {
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(5)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
public void test() throws Exception {
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
String collection = "testLazyCluster1";
|
||||
cloudClient.request(CollectionAdminRequest.createCollection(collection, "conf1", 2, 2));
|
||||
cluster.waitForActiveCollection(collection, 2, 4);
|
||||
collection = "testLazyCluster2";
|
||||
cloudClient.request(CollectionAdminRequest.createCollection(collection, "conf1", 2, 2));
|
||||
cluster.waitForActiveCollection(collection, 2, 4);
|
||||
|
||||
LazySolrCluster solrCluster = new LazySolrCluster(cluster.getSolrClient().getZkStateReader());
|
||||
SimpleMap<SolrCollection> colls = solrCluster.collections();
|
||||
|
||||
SolrCollection c = colls.get("testLazyCluster1");
|
||||
assertNotNull(c);
|
||||
c = colls.get("testLazyCluster2");
|
||||
assertNotNull(c);
|
||||
int[] count = new int[1];
|
||||
solrCluster.collections().forEachEntry((s, solrCollection) -> count[0]++);
|
||||
assertEquals(2, count[0]);
|
||||
|
||||
count[0] = 0;
|
||||
|
||||
assertEquals(2, solrCluster.collections().get("testLazyCluster1").shards().size());
|
||||
solrCluster.collections().get("testLazyCluster1").shards()
|
||||
.forEachEntry((s, shard) -> shard.replicas().forEachEntry((s1, replica) -> count[0]++));
|
||||
assertEquals(4, count[0]);
|
||||
|
||||
assertEquals(5, solrCluster.nodes().size());
|
||||
SolrZkClient zkClient = cloudClient.getZkStateReader().getZkClient();
|
||||
zkClient.create(ZkStateReader.CONFIGS_ZKNODE + "/conf1/a", null, CreateMode.PERSISTENT, true);
|
||||
zkClient.create(ZkStateReader.CONFIGS_ZKNODE + "/conf1/a/aa1", new byte[1024], CreateMode.PERSISTENT, true);
|
||||
zkClient.create(ZkStateReader.CONFIGS_ZKNODE + "/conf1/a/aa2", new byte[1024 * 2], CreateMode.PERSISTENT, true);
|
||||
|
||||
List<String> allFiles = new ArrayList<>();
|
||||
byte[] buf = new byte[3*1024];
|
||||
CollectionConfig conf1 = solrCluster.configs().get("conf1");
|
||||
conf1.resources().abortableForEach((s, resource) -> {
|
||||
allFiles.add(s);
|
||||
if("a/aa1".equals(s)) {
|
||||
resource.get(is -> assertEquals(1024, is.read(buf)));
|
||||
}
|
||||
if("a/aa2".equals(s)) {
|
||||
resource.get(is -> assertEquals(2*1024, is.read(buf)));
|
||||
}
|
||||
if("a".equals(s)) {
|
||||
resource.get(is -> assertEquals(-1, is.read()));
|
||||
}
|
||||
return Boolean.TRUE;
|
||||
});
|
||||
assertEquals(5, allFiles.size());
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,446 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.common;
|
||||
|
||||
import org.apache.solr.cluster.api.*;
|
||||
import org.apache.solr.common.cloud.*;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.common.util.WrappedSimpleMap;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.URL_SCHEME;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.getCollectionPathRoot;
|
||||
|
||||
/**
|
||||
* Reference implementation for SolrCluster.
|
||||
* As much as possible fetch all the values lazily because the value of anything
|
||||
* can change any moment
|
||||
* Creating an instance is a low cost operation. It does not result in a
|
||||
* network call or large object creation
|
||||
*
|
||||
*/
|
||||
|
||||
public class LazySolrCluster implements SolrCluster {
|
||||
final ZkStateReader zkStateReader;
|
||||
|
||||
private final Map<String, SolrCollectionImpl> cached = new ConcurrentHashMap<>();
|
||||
private final SimpleMap<SolrCollection> collections;
|
||||
private final SimpleMap<SolrCollection> collectionsAndAliases;
|
||||
private final SimpleMap<SolrNode> nodes;
|
||||
private SimpleMap<CollectionConfig> configs;
|
||||
|
||||
public LazySolrCluster(ZkStateReader zkStateReader) {
|
||||
this.zkStateReader = zkStateReader;
|
||||
collections = lazyCollectionsMap(zkStateReader);
|
||||
collectionsAndAliases = lazyCollectionsWithAlias(zkStateReader);
|
||||
nodes = lazyNodeMap();
|
||||
}
|
||||
|
||||
private SimpleMap<CollectionConfig> lazyConfigMap() {
|
||||
Set<String> configNames = new HashSet<>();
|
||||
new SimpleZkMap(zkStateReader, ZkStateReader.CONFIGS_ZKNODE)
|
||||
.abortableForEach((name, resource) -> {
|
||||
if (!name.contains("/")) {
|
||||
configNames.add(name);
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
return Boolean.FALSE;
|
||||
});
|
||||
|
||||
return new SimpleMap<>() {
|
||||
@Override
|
||||
public CollectionConfig get(String key) {
|
||||
if (configNames.contains(key)) {
|
||||
return new ConfigImpl(key);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super CollectionConfig> fun) {
|
||||
for (String name : configNames) {
|
||||
fun.accept(name, new ConfigImpl(name));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return configNames.size();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SimpleMap<SolrNode> lazyNodeMap() {
|
||||
return new SimpleMap<>() {
|
||||
@Override
|
||||
public SolrNode get(String key) {
|
||||
if (!zkStateReader.getClusterState().liveNodesContain(key)) {
|
||||
return null;
|
||||
}
|
||||
return new Node(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super SolrNode> fun) {
|
||||
for (String s : zkStateReader.getClusterState().getLiveNodes()) {
|
||||
fun.accept(s, new Node(s));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return zkStateReader.getClusterState().getLiveNodes().size();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SimpleMap<SolrCollection> lazyCollectionsWithAlias(ZkStateReader zkStateReader) {
|
||||
return new SimpleMap<>() {
|
||||
@Override
|
||||
public SolrCollection get(String key) {
|
||||
SolrCollection result = collections.get(key);
|
||||
if (result != null) return result;
|
||||
Aliases aliases = zkStateReader.getAliases();
|
||||
List<String> aliasNames = aliases.resolveAliases(key);
|
||||
if (aliasNames == null || aliasNames.isEmpty()) return null;
|
||||
return _collection(aliasNames.get(0), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super SolrCollection> fun) {
|
||||
collections.forEachEntry(fun);
|
||||
Aliases aliases = zkStateReader.getAliases();
|
||||
aliases.forEachAlias((s, colls) -> {
|
||||
if (colls == null || colls.isEmpty()) return;
|
||||
fun.accept(s, _collection(colls.get(0), null));
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return collections.size() + zkStateReader.getAliases().size();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SimpleMap<SolrCollection> lazyCollectionsMap(ZkStateReader zkStateReader) {
|
||||
return new SimpleMap<>() {
|
||||
@Override
|
||||
public SolrCollection get(String key) {
|
||||
return _collection(key, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super SolrCollection> fun) {
|
||||
zkStateReader.getClusterState().forEachCollection(coll -> fun.accept(coll.getName(), _collection(coll.getName(), coll)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return zkStateReader.getClusterState().size();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SolrCollection _collection(String key, DocCollection c) {
|
||||
if (c == null) c = zkStateReader.getCollection(key);
|
||||
if (c == null) {
|
||||
cached.remove(key);
|
||||
return null;
|
||||
}
|
||||
SolrCollectionImpl existing = cached.get(key);
|
||||
if (existing == null || existing.coll != c) {
|
||||
cached.put(key, existing = new SolrCollectionImpl(c, zkStateReader));
|
||||
}
|
||||
return existing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<SolrCollection> collections() throws SolrException {
|
||||
return collections;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<SolrCollection> collections(boolean includeAlias) throws SolrException {
|
||||
return includeAlias ? collectionsAndAliases : collections;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<SolrNode> nodes() throws SolrException {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<CollectionConfig> configs() throws SolrException {
|
||||
if (configs == null) {
|
||||
//these are lightweight objects and we don't care even if multiple objects ar ecreated b/c of a race condition
|
||||
configs = lazyConfigMap();
|
||||
}
|
||||
return configs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String overseerNode() throws SolrException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String thisNode() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private class SolrCollectionImpl implements SolrCollection {
|
||||
final DocCollection coll;
|
||||
final SimpleMap<Shard> shards;
|
||||
final ZkStateReader zkStateReader;
|
||||
final Router router;
|
||||
String confName;
|
||||
|
||||
private SolrCollectionImpl(DocCollection coll, ZkStateReader zkStateReader) {
|
||||
this.coll = coll;
|
||||
this.zkStateReader = zkStateReader;
|
||||
this.router = key -> coll.getRouter().getTargetSlice(key, null, null, null, null).getName();
|
||||
LinkedHashMap<String, Shard> map = new LinkedHashMap<>();
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
map.put(slice.getName(), new ShardImpl(this, slice));
|
||||
}
|
||||
shards = new WrappedSimpleMap<>(map);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return coll.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<Shard> shards() {
|
||||
return shards;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("rawtypes")
|
||||
public String config() {
|
||||
if (confName == null) {
|
||||
// do this lazily . It's usually not necessary
|
||||
try {
|
||||
byte[] d = zkStateReader.getZkClient().getData(getCollectionPathRoot(coll.getName()), null, null, true);
|
||||
if (d == null || d.length == 0) return null;
|
||||
Map m = (Map) Utils.fromJSON(d);
|
||||
confName = (String) m.get("configName");
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
SimpleZkMap.throwZkExp(e);
|
||||
//cannot read from ZK
|
||||
return null;
|
||||
|
||||
}
|
||||
}
|
||||
return confName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Router router() {
|
||||
return router;
|
||||
}
|
||||
}
|
||||
|
||||
private class ShardImpl implements Shard {
|
||||
final SolrCollectionImpl collection;
|
||||
final Slice slice;
|
||||
final HashRange range;
|
||||
final SimpleMap<ShardReplica> replicas;
|
||||
|
||||
private ShardImpl(SolrCollectionImpl collection, Slice slice) {
|
||||
this.collection = collection;
|
||||
this.slice = slice;
|
||||
range = _range(slice);
|
||||
replicas = _replicas();
|
||||
}
|
||||
|
||||
private SimpleMap<ShardReplica> _replicas() {
|
||||
Map<String, ShardReplica> replicas = new HashMap<>();
|
||||
slice.forEach(replica -> replicas.put(replica.getName(), new ShardReplicaImpl(ShardImpl.this, replica)));
|
||||
return new WrappedSimpleMap<>(replicas);
|
||||
}
|
||||
|
||||
private HashRange _range(Slice slice) {
|
||||
return slice.getRange() == null ?
|
||||
null :
|
||||
new HashRange() {
|
||||
@Override
|
||||
public int min() {
|
||||
return slice.getRange().min;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int max() {
|
||||
return slice.getRange().max;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return slice.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String collection() {
|
||||
return collection.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HashRange range() {
|
||||
return range;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<ShardReplica> replicas() {
|
||||
return replicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String leader() {
|
||||
Replica leader = slice.getLeader();
|
||||
return leader == null ? null : leader.getName();
|
||||
}
|
||||
}
|
||||
|
||||
private class ShardReplicaImpl implements ShardReplica {
|
||||
private final ShardImpl shard;
|
||||
private final Replica replica;
|
||||
|
||||
private ShardReplicaImpl(ShardImpl shard, Replica replica) {
|
||||
this.shard = shard;
|
||||
this.replica = replica;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return replica.name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String shard() {
|
||||
return shard.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String collection() {
|
||||
return shard.collection.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String node() {
|
||||
return replica.getNodeName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String core() {
|
||||
return replica.getCoreName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Replica.Type type() {
|
||||
return replica.type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean alive() {
|
||||
return zkStateReader.getClusterState().getLiveNodes().contains(node())
|
||||
&& replica.getState() == Replica.State.ACTIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long indexSize() {
|
||||
//todo implement later
|
||||
throw new UnsupportedOperationException("Not yet implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLeader() {
|
||||
return Objects.equals(shard.leader() , name());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String url(ApiType type) {
|
||||
String base = nodes.get(node()).baseUrl(type);
|
||||
if (type == ApiType.V2) {
|
||||
return base + "/cores/" + core();
|
||||
} else {
|
||||
return base + "/" + core();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class Node implements SolrNode {
|
||||
private final String name;
|
||||
|
||||
private Node(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String baseUrl(ApiType apiType) {
|
||||
return Utils.getBaseUrlForNodeName(name, zkStateReader.getClusterProperty(URL_SCHEME, "http"), apiType == ApiType.V2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<ShardReplica> cores() {
|
||||
//todo implement later
|
||||
//this requires a call to the node
|
||||
throw new UnsupportedOperationException("Not yet implemented");
|
||||
}
|
||||
}
|
||||
|
||||
private class ConfigImpl implements CollectionConfig {
|
||||
final String name;
|
||||
final SimpleMap<Resource> resources;
|
||||
final String path;
|
||||
|
||||
private ConfigImpl(String name) {
|
||||
this.name = name;
|
||||
path = ZkStateReader.CONFIGS_ZKNODE + "/" + name;
|
||||
this.resources = new SimpleZkMap(zkStateReader, path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleMap<Resource> resources() {
|
||||
return resources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return name;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.common;
|
||||
|
||||
import org.apache.solr.cluster.api.Resource;
|
||||
import org.apache.solr.cluster.api.SimpleMap;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
/**A view of ZK as a {@link SimpleMap} impl. This gives a flat view of all paths instead of a tree view
|
||||
* eg: a, b, c , a/a1, a/a2, a/a1/aa1 etc
|
||||
* If possible, use the {@link #abortableForEach(BiFunction)} to traverse
|
||||
* DO not use the {@link #size()} method. It always return 0 because it is very expensive to compute that
|
||||
*
|
||||
*/
|
||||
public class SimpleZkMap implements SimpleMap<Resource> {
|
||||
private final ZkStateReader zkStateReader;
|
||||
private final String basePath;
|
||||
|
||||
static final byte[] EMPTY_BYTES = new byte[0];
|
||||
|
||||
|
||||
public SimpleZkMap(ZkStateReader zkStateReader, String path) {
|
||||
this.zkStateReader = zkStateReader;
|
||||
this.basePath = path;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Resource get(String key) {
|
||||
return readZkNode(basePath + key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortableForEach(BiFunction<String, ? super Resource, Boolean> fun) {
|
||||
try {
|
||||
recursiveRead("",
|
||||
zkStateReader.getZkClient().getChildren(basePath, null, true),
|
||||
fun);
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throwZkExp(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super Resource> fun) {
|
||||
abortableForEach((path, resource) -> {
|
||||
fun.accept(path, resource);
|
||||
return Boolean.TRUE;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private Resource readZkNode(String path) {
|
||||
return new Resource() {
|
||||
@Override
|
||||
public String name() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void get(Consumer consumer) throws SolrException {
|
||||
try {
|
||||
byte[] data = zkStateReader.getZkClient().getData(basePath+"/"+ path, null, null, true);
|
||||
if (data != null && data.length > 0) {
|
||||
consumer.read(new ByteArrayInputStream(data));
|
||||
} else {
|
||||
consumer.read(new ByteArrayInputStream(EMPTY_BYTES));
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throwZkExp(e);
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can;t read stream" , e);
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private boolean recursiveRead(String parent, List<String> childrenList, BiFunction<String, ? super Resource, Boolean> fun) {
|
||||
if(childrenList == null || childrenList.isEmpty()) return true;
|
||||
try {
|
||||
Map<String, List<String>> withKids = new LinkedHashMap<>();
|
||||
for (String child : childrenList) {
|
||||
String relativePath = parent.isBlank() ? child: parent+"/"+child;
|
||||
if(!fun.apply(relativePath, readZkNode(relativePath))) return false;
|
||||
List<String> l1 = zkStateReader.getZkClient().getChildren(basePath+ "/"+ relativePath, null, true);
|
||||
if(l1 != null && !l1.isEmpty()) {
|
||||
withKids.put(relativePath, l1);
|
||||
}
|
||||
}
|
||||
//now we iterate through all nodes with sub paths
|
||||
for (Map.Entry<String, List<String>> e : withKids.entrySet()) {
|
||||
//has children
|
||||
if(!recursiveRead(e.getKey(), e.getValue(), fun)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throwZkExp(e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void throwZkExp(Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ZK errror", e);
|
||||
}
|
||||
|
||||
}
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -72,6 +73,13 @@ public class Aliases {
|
|||
this.zNodeVersion = zNodeVersion;
|
||||
}
|
||||
|
||||
public void forEachAlias(BiConsumer<String, List<String>> consumer) {
|
||||
collectionAliases.forEach((s, colls) -> consumer.accept(s, Collections.unmodifiableList(colls)));
|
||||
}
|
||||
public int size() {
|
||||
return collectionAliases.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance from the JSON bytes read from zookeeper. Generally this should
|
||||
* only be done by a ZkStateReader.
|
||||
|
|
|
@ -380,4 +380,8 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return collectionStates.size();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
import org.apache.solr.cluster.api.HashRange;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
@ -86,7 +87,7 @@ public abstract class DocRouter {
|
|||
// Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min.
|
||||
// TODO: ranges may not be all contiguous in the future (either that or we will
|
||||
// need an extra class to model a collection of ranges)
|
||||
public static class Range implements JSONWriter.Writable, Comparable<Range> {
|
||||
public static class Range implements JSONWriter.Writable, Comparable<Range> , HashRange {
|
||||
public int min; // inclusive
|
||||
public int max; // inclusive
|
||||
|
||||
|
@ -96,6 +97,16 @@ public abstract class DocRouter {
|
|||
this.max = max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int min() {
|
||||
return min;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int max() {
|
||||
return max;
|
||||
}
|
||||
|
||||
public boolean includes(int hash) {
|
||||
return hash >= min && hash <= max;
|
||||
}
|
||||
|
|
|
@ -51,11 +51,7 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.params.CollectionAdminParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.SolrNamedThreadFactory;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.common.util.*;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
|
@ -2182,4 +2178,8 @@ public class ZkStateReader implements SolrCloseable {
|
|||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public DocCollection getCollection(String collection) {
|
||||
return clusterState.getCollectionOrNull(collection);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.common.util;
|
||||
|
||||
import org.apache.solr.cluster.api.SimpleMap;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public class LinkedSimpleHashMap<T> extends LinkedHashMap<String, T> implements SimpleMap<T> {
|
||||
@Override
|
||||
public T get(String key) {
|
||||
return super.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super T> fun) {
|
||||
|
||||
}
|
||||
}
|
|
@ -30,7 +30,11 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.solr.cluster.api.SimpleMap;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.MultiMapSolrParams;
|
||||
|
@ -63,7 +67,7 @@ import org.apache.solr.common.params.SolrParams;
|
|||
*
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public class NamedList<T> implements Cloneable, Serializable, Iterable<Map.Entry<String,T>> , MapWriter {
|
||||
public class NamedList<T> implements Cloneable, Serializable, Iterable<Map.Entry<String,T>> , MapWriter, SimpleMap<T> {
|
||||
|
||||
private static final long serialVersionUID = 1957981902839867821L;
|
||||
protected final List<Object> nvPairs;
|
||||
|
@ -509,7 +513,7 @@ public class NamedList<T> implements Cloneable, Serializable, Iterable<Map.Entry
|
|||
|
||||
@Override
|
||||
public void forEach(BiConsumer action) {
|
||||
NamedList.this.forEach(action);
|
||||
NamedList.this.forEachEntry(action);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -854,10 +858,39 @@ public class NamedList<T> implements Cloneable, Serializable, Iterable<Map.Entry
|
|||
return this.nvPairs.equals(nl.nvPairs);
|
||||
}
|
||||
|
||||
public void forEach(BiConsumer<String, T> action) {
|
||||
|
||||
@Override
|
||||
public void abortableForEach(BiFunction<String, ? super T, Boolean> fun) {
|
||||
int sz = size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
if(!fun.apply(getName(i), getVal(i))) break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortableForEachKey(Function<String, Boolean> fun) {
|
||||
int sz = size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
if(!fun.apply(getName(i))) break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachKey(Consumer<String> fun) {
|
||||
int sz = size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
fun.accept(getName(i));
|
||||
}
|
||||
}
|
||||
public void forEach(BiConsumer<String, ? super T> action) {
|
||||
int sz = size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
action.accept(getName(i), getVal(i));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super T> fun) {
|
||||
forEach(fun);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -728,13 +728,16 @@ public class Utils {
|
|||
}
|
||||
|
||||
public static String getBaseUrlForNodeName(final String nodeName, String urlScheme) {
|
||||
return getBaseUrlForNodeName(nodeName, urlScheme, false);
|
||||
}
|
||||
public static String getBaseUrlForNodeName(final String nodeName, String urlScheme, boolean isV2) {
|
||||
final int _offset = nodeName.indexOf("_");
|
||||
if (_offset < 0) {
|
||||
throw new IllegalArgumentException("nodeName does not contain expected '_' separator: " + nodeName);
|
||||
}
|
||||
final String hostAndPort = nodeName.substring(0, _offset);
|
||||
final String path = URLDecoder.decode(nodeName.substring(1 + _offset), UTF_8);
|
||||
return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path));
|
||||
return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + (isV2? "api": path)));
|
||||
}
|
||||
|
||||
public static long time(TimeSource timeSource, TimeUnit unit) {
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.common.util;
|
||||
|
||||
import org.apache.solr.cluster.api.SimpleMap;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public class WrappedSimpleMap<T> implements SimpleMap<T> {
|
||||
private final Map<String, T> delegate;
|
||||
|
||||
@Override
|
||||
public T get(String key) {
|
||||
return delegate.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachEntry(BiConsumer<String, ? super T> fun) {
|
||||
delegate.forEach(fun);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return delegate.size();
|
||||
}
|
||||
|
||||
|
||||
public WrappedSimpleMap(Map<String, T> delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue