mirror of https://github.com/apache/lucene.git
SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider
This commit is contained in:
parent
1f9c767aac
commit
e99934b240
|
@ -278,6 +278,8 @@ New Features
|
|||
|
||||
* SOLR-13241: Add 'autoscaling' tool support to solr.cmd (Jason Gerlowski)
|
||||
|
||||
* SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider (Cao Manh Dat)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,309 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
|
||||
|
||||
public abstract class BaseHttpClusterStateProvider implements ClusterStateProvider {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private String urlScheme;
|
||||
volatile Set<String> liveNodes;
|
||||
long liveNodesTimestamp = 0;
|
||||
volatile Map<String, List<String>> aliases;
|
||||
long aliasesTimestamp = 0;
|
||||
|
||||
private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
|
||||
|
||||
public void init(List<String> solrUrls) throws Exception {
|
||||
for (String solrUrl: solrUrls) {
|
||||
urlScheme = solrUrl.startsWith("https")? "https": "http";
|
||||
try (SolrClient initialClient = getSolrClient(solrUrl)) {
|
||||
this.liveNodes = fetchLiveNodes(initialClient);
|
||||
liveNodesTimestamp = System.nanoTime();
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
log.warn("Attempt to fetch cluster state from {} failed.", solrUrl, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.liveNodes == null || this.liveNodes.isEmpty()) {
|
||||
throw new RuntimeException("Tried fetching live_nodes using Solr URLs provided, i.e. " + solrUrls + ". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract SolrClient getSolrClient(String baseUrl);
|
||||
|
||||
@Override
|
||||
public ClusterState.CollectionRef getState(String collection) {
|
||||
for (String nodeName: liveNodes) {
|
||||
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
|
||||
try (SolrClient client = getSolrClient(baseUrl)) {
|
||||
ClusterState cs = fetchClusterState(client, collection, null);
|
||||
return cs.getCollectionRef(collection);
|
||||
} catch (SolrServerException | IOException e) {
|
||||
log.warn("Attempt to fetch cluster state from " +
|
||||
Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
|
||||
} catch (RemoteSolrException e) {
|
||||
if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
|
||||
return null;
|
||||
}
|
||||
log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
|
||||
} catch (NotACollectionException e) {
|
||||
// Cluster state for the given collection was not found, could be an alias.
|
||||
// Lets fetch/update our aliases:
|
||||
getAliases(true);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private ClusterState fetchClusterState(SolrClient client, String collection, Map<String, Object> clusterProperties) throws SolrServerException, IOException, NotACollectionException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
if (collection != null) {
|
||||
params.set("collection", collection);
|
||||
}
|
||||
params.set("action", "CLUSTERSTATUS");
|
||||
QueryRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
|
||||
Map<String, Object> collectionsMap;
|
||||
if (collection != null) {
|
||||
collectionsMap = Collections.singletonMap(collection,
|
||||
((NamedList) cluster.get("collections")).get(collection));
|
||||
} else {
|
||||
collectionsMap = ((NamedList)cluster.get("collections")).asMap(10);
|
||||
}
|
||||
int znodeVersion;
|
||||
Map<String, Object> collFromStatus = (Map<String, Object>) (collectionsMap).get(collection);
|
||||
if (collection != null && collFromStatus == null) {
|
||||
throw new NotACollectionException(); // probably an alias
|
||||
}
|
||||
if (collection != null) { // can be null if alias
|
||||
znodeVersion = (int) collFromStatus.get("znodeVersion");
|
||||
} else {
|
||||
znodeVersion = -1;
|
||||
}
|
||||
Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
|
||||
this.liveNodes = liveNodes;
|
||||
liveNodesTimestamp = System.nanoTime();
|
||||
//TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
|
||||
ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
|
||||
if (clusterProperties != null) {
|
||||
Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
|
||||
if (properties != null) {
|
||||
clusterProperties.putAll(properties);
|
||||
}
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getLiveNodes() {
|
||||
if (liveNodes == null) {
|
||||
throw new RuntimeException("We don't know of any live_nodes to fetch the"
|
||||
+ " latest live_nodes information from. "
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
|
||||
for (String nodeName: liveNodes) {
|
||||
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
|
||||
try (SolrClient client = getSolrClient(baseUrl)) {
|
||||
Set<String> liveNodes = fetchLiveNodes(client);
|
||||
this.liveNodes = (liveNodes);
|
||||
liveNodesTimestamp = System.nanoTime();
|
||||
return liveNodes;
|
||||
} catch (Exception e) {
|
||||
log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
} else {
|
||||
return liveNodes; // cached copy is fresh enough
|
||||
}
|
||||
}
|
||||
|
||||
private static Set<String> fetchLiveNodes(SolrClient client) throws Exception {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", "CLUSTERSTATUS");
|
||||
QueryRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
|
||||
return (Set<String>) new HashSet((List<String>)(cluster.get("live_nodes")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> resolveAlias(String aliasName) {
|
||||
return Aliases.resolveAliasesGivenAliasMap(getAliases(false), aliasName);
|
||||
}
|
||||
|
||||
private Map<String, List<String>> getAliases(boolean forceFetch) {
|
||||
if (this.liveNodes == null) {
|
||||
throw new RuntimeException("We don't know of any live_nodes to fetch the"
|
||||
+ " latest aliases information from. "
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
if (forceFetch || this.aliases == null ||
|
||||
TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
|
||||
for (String nodeName: liveNodes) {
|
||||
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
|
||||
try (SolrClient client = getSolrClient(baseUrl)) {
|
||||
|
||||
this.aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
|
||||
this.aliasesTimestamp = System.nanoTime();
|
||||
return Collections.unmodifiableMap(this.aliases);
|
||||
} catch (SolrServerException | RemoteSolrException | IOException e) {
|
||||
// Situation where we're hitting an older Solr which doesn't have LISTALIASES
|
||||
if (e instanceof RemoteSolrException && ((RemoteSolrException)e).code()==400) {
|
||||
log.warn("LISTALIASES not found, possibly using older Solr server. Aliases won't work"
|
||||
+ " unless you re-create the CloudSolrClient using zkHost(s) or upgrade Solr server", e);
|
||||
this.aliases = Collections.emptyMap();
|
||||
this.aliasesTimestamp = System.nanoTime();
|
||||
return aliases;
|
||||
}
|
||||
log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
|
||||
}
|
||||
}
|
||||
|
||||
throw new RuntimeException("Tried fetching aliases using all the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using a working"
|
||||
+ " solrUrl or zkHost.");
|
||||
} else {
|
||||
return Collections.unmodifiableMap(this.aliases); // cached copy is fresh enough
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState getClusterState() throws IOException {
|
||||
for (String nodeName: liveNodes) {
|
||||
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
|
||||
try (SolrClient client = getSolrClient(baseUrl)) {
|
||||
return fetchClusterState(client, null, null);
|
||||
} catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
|
||||
log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
|
||||
} catch (NotACollectionException e) {
|
||||
// not possible! (we passed in null for collection so it can't be an alias)
|
||||
throw new RuntimeException("null should never cause NotACollectionException in " +
|
||||
"fetchClusterState() Please report this as a bug!");
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getClusterProperties() {
|
||||
for (String nodeName : liveNodes) {
|
||||
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
|
||||
try (SolrClient client = getSolrClient(baseUrl)) {
|
||||
Map<String, Object> clusterProperties = new HashMap<>();
|
||||
fetchClusterState(client, null, clusterProperties);
|
||||
return clusterProperties;
|
||||
} catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
|
||||
log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
|
||||
} catch (NotACollectionException e) {
|
||||
// not possible! (we passed in null for collection so it can't be an alias)
|
||||
throw new RuntimeException("null should never cause NotACollectionException in " +
|
||||
"fetchClusterState() Please report this as a bug!");
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes + ". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyNameByCollection(String coll) {
|
||||
throw new UnsupportedOperationException("Fetching cluster properties not supported"
|
||||
+ " using the HttpClusterStateProvider. "
|
||||
+ "ZkClientClusterStateProvider can be used for this."); // TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getClusterProperty(String propertyName) {
|
||||
if (propertyName.equals(ZkStateReader.URL_SCHEME)) {
|
||||
return this.urlScheme;
|
||||
}
|
||||
return getClusterProperties().get(propertyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect() {}
|
||||
|
||||
public int getCacheTimeout() {
|
||||
return cacheTimeout;
|
||||
}
|
||||
|
||||
public void setCacheTimeout(int cacheTimeout) {
|
||||
this.cacheTimeout = cacheTimeout;
|
||||
}
|
||||
|
||||
// This exception is not meant to escape this class it should be caught and wrapped.
|
||||
private class NotACollectionException extends Exception {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
|
||||
import static org.apache.solr.common.util.Utils.getObjectByPath;
|
||||
|
||||
public abstract class BaseHttpSolrClient extends SolrClient {
|
||||
|
||||
/**
|
||||
* Subclass of SolrException that allows us to capture an arbitrary HTTP
|
||||
* status code that may have been returned by the remote server or a
|
||||
* proxy along the way.
|
||||
*/
|
||||
public static class RemoteSolrException extends SolrException {
|
||||
/**
|
||||
* @param remoteHost the host the error was received from
|
||||
* @param code Arbitrary HTTP status code
|
||||
* @param msg Exception Message
|
||||
* @param th Throwable to wrap with this Exception
|
||||
*/
|
||||
public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
|
||||
super(code, "Error from server at " + remoteHost + ": " + msg, th);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This should be thrown when a server has an error in executing the request and
|
||||
* it sends a proper payload back to the client
|
||||
*/
|
||||
public static class RemoteExecutionException extends HttpSolrClient.RemoteSolrException {
|
||||
private NamedList meta;
|
||||
|
||||
public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
|
||||
super(remoteHost, code, msg, null);
|
||||
this.meta = meta;
|
||||
}
|
||||
|
||||
|
||||
public static HttpSolrClient.RemoteExecutionException create(String host, NamedList errResponse) {
|
||||
Object errObj = errResponse.get("error");
|
||||
if (errObj != null) {
|
||||
Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
|
||||
String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
|
||||
return new HttpSolrClient.RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
|
||||
msg == null ? "Unknown Error" : msg, errResponse);
|
||||
|
||||
} else {
|
||||
throw new RuntimeException("No error");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public NamedList getMetaData() {
|
||||
|
||||
return meta;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,237 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
||||
/**
|
||||
* SolrJ client class to communicate with SolrCloud using Http2SolrClient.
|
||||
* Instances of this class communicate with Zookeeper to discover
|
||||
* Solr endpoints for SolrCloud collections, and then use the
|
||||
* {@link LBHttp2SolrClient} to issue requests.
|
||||
*
|
||||
* This class assumes the id field for your documents is called
|
||||
* 'id' - if this is not the case, you must set the right name
|
||||
* with {@link #setIdField(String)}.
|
||||
*
|
||||
* @lucene.experimental
|
||||
* @since solr 8.0
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class CloudHttp2SolrClient extends BaseCloudSolrClient {
|
||||
|
||||
private final ClusterStateProvider stateProvider;
|
||||
private final LBHttp2SolrClient lbClient;
|
||||
private Http2SolrClient myClient;
|
||||
private final boolean clientIsInternal;
|
||||
|
||||
/**
|
||||
* Create a new client object that connects to Zookeeper and is always aware
|
||||
* of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
|
||||
* SolrCloud has enough replicas for every shard in a collection, there is no
|
||||
* single point of failure. Updates will be sent to shard leaders by default.
|
||||
*
|
||||
* @param builder a {@link Http2SolrClient.Builder} with the options used to create the client.
|
||||
*/
|
||||
protected CloudHttp2SolrClient(Builder builder) {
|
||||
super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
|
||||
this.clientIsInternal = builder.httpClient == null;
|
||||
this.myClient = (builder.httpClient == null) ? new Http2SolrClient.Builder().build() : builder.httpClient;
|
||||
if (builder.stateProvider == null) {
|
||||
if (builder.zkHosts != null && builder.solrUrls != null) {
|
||||
throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
|
||||
}
|
||||
if (builder.zkHosts != null) {
|
||||
this.stateProvider = new ZkClientClusterStateProvider(builder.zkHosts, builder.zkChroot);
|
||||
} else if (builder.solrUrls != null && !builder.solrUrls.isEmpty()) {
|
||||
try {
|
||||
this.stateProvider = new Http2ClusterStateProvider(builder.solrUrls, builder.httpClient);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
|
||||
+ "Solr server(s), " + builder.solrUrls + ", down?)", e);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
|
||||
}
|
||||
} else {
|
||||
this.stateProvider = builder.stateProvider;
|
||||
}
|
||||
this.lbClient = new LBHttp2SolrClient(myClient);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
stateProvider.close();
|
||||
lbClient.close();
|
||||
|
||||
if (clientIsInternal && myClient!=null) {
|
||||
myClient.close();
|
||||
}
|
||||
|
||||
super.close();
|
||||
}
|
||||
|
||||
public LBHttp2SolrClient getLbClient() {
|
||||
return lbClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterStateProvider getClusterStateProvider() {
|
||||
return stateProvider;
|
||||
}
|
||||
|
||||
public Http2SolrClient getHttpClient() {
|
||||
return myClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean wasCommError(Throwable rootCause) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs {@link CloudHttp2SolrClient} instances from provided configuration.
|
||||
*/
|
||||
public static class Builder {
|
||||
protected Collection<String> zkHosts = new ArrayList<>();
|
||||
protected List<String> solrUrls = new ArrayList<>();
|
||||
protected String zkChroot;
|
||||
protected Http2SolrClient httpClient;
|
||||
protected boolean shardLeadersOnly = true;
|
||||
protected boolean directUpdatesToLeadersOnly = false;
|
||||
protected boolean parallelUpdates = true;
|
||||
protected ClusterStateProvider stateProvider;
|
||||
|
||||
/**
|
||||
* Provide a series of Solr URLs to be used when configuring {@link CloudHttp2SolrClient} instances.
|
||||
* The solr client will use these urls to understand the cluster topology, which solr nodes are active etc.
|
||||
*
|
||||
* Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not
|
||||
* include any collections, cores, or other path components.
|
||||
*
|
||||
* Usage example:
|
||||
*
|
||||
* <pre>
|
||||
* final List<String> solrBaseUrls = new ArrayList<String>();
|
||||
* solrBaseUrls.add("http://solr1:8983/solr"); solrBaseUrls.add("http://solr2:8983/solr"); solrBaseUrls.add("http://solr3:8983/solr");
|
||||
* final SolrClient client = new CloudHttp2SolrClient.Builder(solrBaseUrls).build();
|
||||
* </pre>
|
||||
*/
|
||||
public Builder(List<String> solrUrls) {
|
||||
this.solrUrls = solrUrls;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a series of ZK hosts which will be used when configuring {@link CloudHttp2SolrClient} instances.
|
||||
*
|
||||
* Usage example when Solr stores data at the ZooKeeper root ('/'):
|
||||
*
|
||||
* <pre>
|
||||
* final List<String> zkServers = new ArrayList<String>();
|
||||
* zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
|
||||
* final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.empty()).build();
|
||||
* </pre>
|
||||
*
|
||||
* Usage example when Solr data is stored in a ZooKeeper chroot:
|
||||
*
|
||||
* <pre>
|
||||
* final List<String> zkServers = new ArrayList<String>();
|
||||
* zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
|
||||
* final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.of("/solr")).build();
|
||||
* </pre>
|
||||
*
|
||||
* @param zkHosts a List of at least one ZooKeeper host and port (e.g. "zookeeper1:2181")
|
||||
* @param zkChroot the path to the root ZooKeeper node containing Solr data. Provide {@code java.util.Optional.empty()} if no ZK chroot is used.
|
||||
*/
|
||||
public Builder(List<String> zkHosts, Optional<String> zkChroot) {
|
||||
this.zkHosts = zkHosts;
|
||||
if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells {@link CloudHttp2SolrClient.Builder} that created clients should send direct updates to shard leaders only.
|
||||
*
|
||||
* UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a {@link SolrException}
|
||||
*/
|
||||
public Builder sendDirectUpdatesToShardLeadersOnly() {
|
||||
directUpdatesToLeadersOnly = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells {@link CloudHttp2SolrClient.Builder} that created clients can send updates to any shard replica (shard leaders and non-leaders).
|
||||
*
|
||||
* Shard leaders are still preferred, but the created clients will fallback to using other replicas if a leader
|
||||
* cannot be found.
|
||||
*/
|
||||
public Builder sendDirectUpdatesToAnyShardReplica() {
|
||||
directUpdatesToLeadersOnly = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells {@link CloudHttp2SolrClient.Builder} whether created clients should send shard updates serially or in parallel
|
||||
*
|
||||
* When an {@link UpdateRequest} affects multiple shards, {@link CloudHttp2SolrClient} splits it up and sends a request
|
||||
* to each affected shard. This setting chooses whether those sub-requests are sent serially or in parallel.
|
||||
* <p>
|
||||
* If not set, this defaults to 'true' and sends sub-requests in parallel.
|
||||
*/
|
||||
public Builder withParallelUpdates(boolean parallelUpdates) {
|
||||
this.parallelUpdates = parallelUpdates;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withHttpClient(Http2SolrClient httpClient) {
|
||||
this.httpClient = httpClient;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link CloudHttp2SolrClient} based on the provided configuration.
|
||||
*/
|
||||
public CloudHttp2SolrClient build() {
|
||||
if (stateProvider == null) {
|
||||
if (!zkHosts.isEmpty()) {
|
||||
stateProvider = new ZkClientClusterStateProvider(zkHosts, zkChroot);
|
||||
}
|
||||
else if (!this.solrUrls.isEmpty()) {
|
||||
try {
|
||||
stateProvider = new Http2ClusterStateProvider(solrUrls, httpClient);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
|
||||
+ "Solr server(s), " + solrUrls + ", down?)", e);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
|
||||
}
|
||||
}
|
||||
return new CloudHttp2SolrClient(this);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
|
||||
public class Http2ClusterStateProvider extends BaseHttpClusterStateProvider {
|
||||
final Http2SolrClient httpClient;
|
||||
final boolean closeClient;
|
||||
|
||||
public Http2ClusterStateProvider(List<String> solrUrls, Http2SolrClient httpClient) throws Exception {
|
||||
this.httpClient = httpClient == null? new Http2SolrClient.Builder().build(): httpClient;
|
||||
this.closeClient = httpClient == null;
|
||||
init(solrUrls);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (this.closeClient && this.httpClient != null) {
|
||||
httpClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SolrClient getSolrClient(String baseUrl) {
|
||||
return new Http2SolrClient.Builder(baseUrl).withHttpClient(httpClient).build();
|
||||
}
|
||||
}
|
|
@ -93,10 +93,18 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
|
||||
import static org.apache.solr.common.util.Utils.getObjectByPath;
|
||||
|
||||
// TODO: error handling, small Http2SolrClient features, security, ssl
|
||||
/**
|
||||
* Difference between this {@link Http2SolrClient} and {@link HttpSolrClient}:
|
||||
* <ul>
|
||||
* <li>{@link Http2SolrClient} sends requests in HTTP/2</li>
|
||||
* <li>{@link Http2SolrClient} can point to multiple urls</li>
|
||||
* <li>{@link Http2SolrClient} does not expose its internal httpClient like {@link HttpSolrClient#getHttpClient()},
|
||||
* sharing connection pools should be done by {@link Http2SolrClient.Builder#withHttpClient(Http2SolrClient)} </li>
|
||||
* </ul>
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public class Http2SolrClient extends SolrClient {
|
||||
|
@ -139,20 +147,12 @@ public class Http2SolrClient extends SolrClient {
|
|||
if (builder.idleTimeout != null) idleTimeout = builder.idleTimeout;
|
||||
else idleTimeout = HttpClientUtil.DEFAULT_SO_TIMEOUT;
|
||||
|
||||
if (builder.httpClient == null) {
|
||||
if (builder.http2SolrClient == null) {
|
||||
httpClient = createHttpClient(builder);
|
||||
closeClient = true;
|
||||
} else {
|
||||
httpClient = builder.httpClient;
|
||||
httpClient = builder.http2SolrClient.httpClient;
|
||||
}
|
||||
if (!httpClient.isStarted()) {
|
||||
try {
|
||||
httpClient.start();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
assert ObjectReleaseTracker.track(this);
|
||||
}
|
||||
|
||||
|
@ -160,10 +160,12 @@ public class Http2SolrClient extends SolrClient {
|
|||
this.listenerFactory.add(factory);
|
||||
}
|
||||
|
||||
// internal usage only
|
||||
HttpClient getHttpClient() {
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
// internal usage only
|
||||
ProtocolHandlers getProtocolHandlers() {
|
||||
return httpClient.getProtocolHandlers();
|
||||
}
|
||||
|
@ -213,6 +215,11 @@ public class Http2SolrClient extends SolrClient {
|
|||
|
||||
if (builder.idleTimeout != null) httpClient.setIdleTimeout(builder.idleTimeout);
|
||||
if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
|
||||
try {
|
||||
httpClient.start();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
|
@ -445,6 +452,7 @@ public class Http2SolrClient extends SolrClient {
|
|||
private Request makeRequest(SolrRequest solrRequest, String collection)
|
||||
throws SolrServerException, IOException {
|
||||
Request req = createRequest(solrRequest, collection);
|
||||
req.header(HttpHeader.ACCEPT_ENCODING, null);
|
||||
setListeners(solrRequest, req);
|
||||
if (solrRequest.getUserPrincipal() != null) {
|
||||
req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
|
||||
|
@ -790,7 +798,7 @@ public class Http2SolrClient extends SolrClient {
|
|||
|
||||
public static class Builder {
|
||||
|
||||
private HttpClient httpClient;
|
||||
private Http2SolrClient http2SolrClient;
|
||||
private SSLConfig sslConfig = defaultSSLConfig;
|
||||
private Integer idleTimeout;
|
||||
private Integer connectionTimeout;
|
||||
|
@ -810,8 +818,11 @@ public class Http2SolrClient extends SolrClient {
|
|||
return new Http2SolrClient(baseSolrUrl, this);
|
||||
}
|
||||
|
||||
public Builder withHttpClient(HttpClient httpClient) {
|
||||
this.httpClient = httpClient;
|
||||
/**
|
||||
* Reuse {@code httpClient} connections pool
|
||||
*/
|
||||
public Builder withHttpClient(Http2SolrClient httpClient) {
|
||||
this.http2SolrClient = httpClient;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -845,52 +856,6 @@ public class Http2SolrClient extends SolrClient {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass of SolrException that allows us to capture an arbitrary HTTP status code that may have been returned by
|
||||
* the remote server or a proxy along the way.
|
||||
*/
|
||||
public static class RemoteSolrException extends SolrException {
|
||||
/**
|
||||
* @param remoteHost the host the error was received from
|
||||
* @param code Arbitrary HTTP status code
|
||||
* @param msg Exception Message
|
||||
* @param th Throwable to wrap with this Exception
|
||||
*/
|
||||
public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
|
||||
super(code, "Error from server at " + remoteHost + ": " + msg, th);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This should be thrown when a server has an error in executing the request and it sends a proper payload back to the
|
||||
* client
|
||||
*/
|
||||
public static class RemoteExecutionException extends RemoteSolrException {
|
||||
private NamedList meta;
|
||||
|
||||
public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
|
||||
super(remoteHost, code, msg, null);
|
||||
this.meta = meta;
|
||||
}
|
||||
|
||||
public static RemoteExecutionException create(String host, NamedList errResponse) {
|
||||
Object errObj = errResponse.get("error");
|
||||
if (errObj != null) {
|
||||
Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
|
||||
String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
|
||||
return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
|
||||
msg == null ? "Unknown Error" : msg, errResponse);
|
||||
|
||||
} else {
|
||||
throw new RuntimeException("No error");
|
||||
}
|
||||
}
|
||||
|
||||
public NamedList getMetaData() {
|
||||
return meta;
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getQueryParams() {
|
||||
return queryParams;
|
||||
}
|
||||
|
|
|
@ -17,67 +17,25 @@
|
|||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClusterState.CollectionRef;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HttpClusterStateProvider implements ClusterStateProvider {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
public class HttpClusterStateProvider extends BaseHttpClusterStateProvider {
|
||||
|
||||
private String urlScheme;
|
||||
volatile Set<String> liveNodes;
|
||||
long liveNodesTimestamp = 0;
|
||||
volatile Map<String, List<String>> aliases;
|
||||
long aliasesTimestamp = 0;
|
||||
|
||||
private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
|
||||
final HttpClient httpClient;
|
||||
final boolean clientIsInternal;
|
||||
private final HttpClient httpClient;
|
||||
private final boolean clientIsInternal;
|
||||
|
||||
public HttpClusterStateProvider(List<String> solrUrls, HttpClient httpClient) throws Exception {
|
||||
this.httpClient = httpClient == null? HttpClientUtil.createClient(null): httpClient;
|
||||
this.clientIsInternal = httpClient == null;
|
||||
for (String solrUrl: solrUrls) {
|
||||
urlScheme = solrUrl.startsWith("https")? "https": "http";
|
||||
try (SolrClient initialClient = new HttpSolrClient.Builder().withBaseSolrUrl(solrUrl).withHttpClient(httpClient).build()) {
|
||||
Set<String> liveNodes = fetchLiveNodes(initialClient); // throws exception if unable to fetch
|
||||
this.liveNodes = liveNodes;
|
||||
liveNodesTimestamp = System.nanoTime();
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
log.warn("Attempt to fetch live_nodes from " + solrUrl + " failed.", e);
|
||||
}
|
||||
}
|
||||
init(solrUrls);
|
||||
}
|
||||
|
||||
if (this.liveNodes == null || this.liveNodes.isEmpty()) {
|
||||
throw new RuntimeException("Tried fetching live_nodes using Solr URLs provided, i.e. " + solrUrls + ". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
@Override
|
||||
protected SolrClient getSolrClient(String baseUrl) {
|
||||
return new HttpSolrClient.Builder().withBaseSolrUrl(baseUrl).withHttpClient(httpClient).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -86,247 +44,4 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
|
|||
HttpClientUtil.close(httpClient);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CollectionRef getState(String collection) {
|
||||
for (String nodeName: liveNodes) {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder().
|
||||
withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
|
||||
withHttpClient(httpClient).build()) {
|
||||
ClusterState cs = fetchClusterState(client, collection, null);
|
||||
return cs.getCollectionRef(collection);
|
||||
} catch (SolrServerException | IOException e) {
|
||||
log.warn("Attempt to fetch cluster state from " +
|
||||
Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
|
||||
} catch (RemoteSolrException e) {
|
||||
if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
|
||||
return null;
|
||||
}
|
||||
log.warn("Attempt to fetch cluster state from " +
|
||||
Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
|
||||
} catch (NotACollectionException e) {
|
||||
// Cluster state for the given collection was not found, could be an alias.
|
||||
// Lets fetch/update our aliases:
|
||||
getAliases(true);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private ClusterState fetchClusterState(SolrClient client, String collection, Map<String, Object> clusterProperties) throws SolrServerException, IOException, NotACollectionException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
if (collection != null) {
|
||||
params.set("collection", collection);
|
||||
}
|
||||
params.set("action", "CLUSTERSTATUS");
|
||||
QueryRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
|
||||
Map<String, Object> collectionsMap;
|
||||
if (collection != null) {
|
||||
collectionsMap = Collections.singletonMap(collection,
|
||||
((NamedList) cluster.get("collections")).get(collection));
|
||||
} else {
|
||||
collectionsMap = ((NamedList)cluster.get("collections")).asMap(10);
|
||||
}
|
||||
int znodeVersion;
|
||||
Map<String, Object> collFromStatus = (Map<String, Object>) (collectionsMap).get(collection);
|
||||
if (collection != null && collFromStatus == null) {
|
||||
throw new NotACollectionException(); // probably an alias
|
||||
}
|
||||
if (collection != null) { // can be null if alias
|
||||
znodeVersion = (int) collFromStatus.get("znodeVersion");
|
||||
} else {
|
||||
znodeVersion = -1;
|
||||
}
|
||||
Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
|
||||
this.liveNodes = liveNodes;
|
||||
liveNodesTimestamp = System.nanoTime();
|
||||
//TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
|
||||
ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
|
||||
if (clusterProperties != null) {
|
||||
Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
|
||||
if (properties != null) {
|
||||
clusterProperties.putAll(properties);
|
||||
}
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getLiveNodes() {
|
||||
if (liveNodes == null) {
|
||||
throw new RuntimeException("We don't know of any live_nodes to fetch the"
|
||||
+ " latest live_nodes information from. "
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
|
||||
for (String nodeName: liveNodes) {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder().
|
||||
withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
|
||||
withHttpClient(httpClient).build()) {
|
||||
Set<String> liveNodes = fetchLiveNodes(client);
|
||||
this.liveNodes = (liveNodes);
|
||||
liveNodesTimestamp = System.nanoTime();
|
||||
return liveNodes;
|
||||
} catch (Exception e) {
|
||||
log.warn("Attempt to fetch live_nodes from " +
|
||||
Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
} else {
|
||||
return liveNodes; // cached copy is fresh enough
|
||||
}
|
||||
}
|
||||
|
||||
private static Set<String> fetchLiveNodes(SolrClient client) throws Exception {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", "CLUSTERSTATUS");
|
||||
QueryRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
|
||||
Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
|
||||
return liveNodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> resolveAlias(String aliasName) {
|
||||
return Aliases.resolveAliasesGivenAliasMap(getAliases(false), aliasName);
|
||||
}
|
||||
|
||||
private Map<String, List<String>> getAliases(boolean forceFetch) {
|
||||
if (this.liveNodes == null) {
|
||||
throw new RuntimeException("We don't know of any live_nodes to fetch the"
|
||||
+ " latest aliases information from. "
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
if (forceFetch || this.aliases == null ||
|
||||
TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
|
||||
for (String nodeName: liveNodes) {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder().
|
||||
withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
|
||||
withHttpClient(httpClient).build()) {
|
||||
|
||||
Map<String, List<String>> aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
|
||||
this.aliases = aliases;
|
||||
this.aliasesTimestamp = System.nanoTime();
|
||||
return Collections.unmodifiableMap(this.aliases);
|
||||
} catch (SolrServerException | RemoteSolrException | IOException e) {
|
||||
// Situation where we're hitting an older Solr which doesn't have LISTALIASES
|
||||
if (e instanceof RemoteSolrException && ((RemoteSolrException)e).code()==400) {
|
||||
log.warn("LISTALIASES not found, possibly using older Solr server. Aliases won't work"
|
||||
+ " unless you re-create the CloudSolrClient using zkHost(s) or upgrade Solr server", e);
|
||||
this.aliases = Collections.emptyMap();
|
||||
this.aliasesTimestamp = System.nanoTime();
|
||||
return aliases;
|
||||
}
|
||||
log.warn("Attempt to fetch cluster state from " +
|
||||
Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
throw new RuntimeException("Tried fetching aliases using all the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using a working"
|
||||
+ " solrUrl or zkHost.");
|
||||
} else {
|
||||
return Collections.unmodifiableMap(this.aliases); // cached copy is fresh enough
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState getClusterState() throws IOException {
|
||||
for (String nodeName: liveNodes) {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder().
|
||||
withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
|
||||
withHttpClient(httpClient).build()) {
|
||||
ClusterState cs = fetchClusterState(client, null, null);
|
||||
return cs;
|
||||
} catch (SolrServerException | RemoteSolrException | IOException e) {
|
||||
log.warn("Attempt to fetch cluster state from " +
|
||||
Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
|
||||
} catch (NotACollectionException e) {
|
||||
// not possible! (we passed in null for collection so it can't be an alias)
|
||||
throw new RuntimeException("null should never cause NotACollectionException in " +
|
||||
"fetchClusterState() Please report this as a bug!");
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getClusterProperties() {
|
||||
for (String nodeName : liveNodes) {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder().
|
||||
withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
|
||||
withHttpClient(httpClient).build()) {
|
||||
Map<String, Object> clusterProperties = new HashMap<>();
|
||||
fetchClusterState(client, null, clusterProperties);
|
||||
return clusterProperties;
|
||||
} catch (SolrServerException | RemoteSolrException | IOException e) {
|
||||
log.warn("Attempt to fetch cluster state from " +
|
||||
Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
|
||||
} catch (NotACollectionException e) {
|
||||
// not possible! (we passed in null for collection so it can't be an alias)
|
||||
throw new RuntimeException("null should never cause NotACollectionException in " +
|
||||
"fetchClusterState() Please report this as a bug!");
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes + ". However, "
|
||||
+ "succeeded in obtaining the cluster state from none of them."
|
||||
+ "If you think your Solr cluster is up and is accessible,"
|
||||
+ " you could try re-creating a new CloudSolrClient using working"
|
||||
+ " solrUrl(s) or zkHost(s).");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyNameByCollection(String coll) {
|
||||
throw new UnsupportedOperationException("Fetching cluster properties not supported"
|
||||
+ " using the HttpClusterStateProvider. "
|
||||
+ "ZkClientClusterStateProvider can be used for this."); // TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getClusterProperty(String propertyName) {
|
||||
if (propertyName.equals(ZkStateReader.URL_SCHEME)) {
|
||||
return this.urlScheme;
|
||||
}
|
||||
return getClusterProperties().get(propertyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect() {}
|
||||
|
||||
public int getCacheTimeout() {
|
||||
return cacheTimeout;
|
||||
}
|
||||
|
||||
public void setCacheTimeout(int cacheTimeout) {
|
||||
this.cacheTimeout = cacheTimeout;
|
||||
}
|
||||
|
||||
// This exception is not meant to escape this class it should be caught and wrapped.
|
||||
private class NotACollectionException extends Exception {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,7 +66,6 @@ import org.apache.http.message.BasicHeader;
|
|||
import org.apache.http.message.BasicNameValuePair;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.solr.client.solrj.ResponseParser;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.V2RequestSupport;
|
||||
|
@ -91,7 +90,7 @@ import static org.apache.solr.common.util.Utils.getObjectByPath;
|
|||
/**
|
||||
* A SolrClient implementation that talks directly to a Solr server via HTTP
|
||||
*/
|
||||
public class HttpSolrClient extends SolrClient {
|
||||
public class HttpSolrClient extends BaseHttpSolrClient {
|
||||
|
||||
private static final String UTF_8 = StandardCharsets.UTF_8.name();
|
||||
private static final String DEFAULT_PATH = "/select";
|
||||
|
@ -560,7 +559,7 @@ public class HttpSolrClient extends SolrClient {
|
|||
} else {
|
||||
contentType = "";
|
||||
}
|
||||
|
||||
|
||||
// handle some http level checks before trying to parse the response
|
||||
switch (httpStatus) {
|
||||
case HttpStatus.SC_OK:
|
||||
|
@ -798,53 +797,26 @@ s * @deprecated since 7.0 Use {@link Builder} methods instead.
|
|||
this.useMultiPartPost = useMultiPartPost;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Subclass of SolrException that allows us to capture an arbitrary HTTP
|
||||
* status code that may have been returned by the remote server or a
|
||||
* proxy along the way.
|
||||
* @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteSolrException} instead
|
||||
*/
|
||||
public static class RemoteSolrException extends SolrException {
|
||||
/**
|
||||
* @param remoteHost the host the error was received from
|
||||
* @param code Arbitrary HTTP status code
|
||||
* @param msg Exception Message
|
||||
* @param th Throwable to wrap with this Exception
|
||||
*/
|
||||
@Deprecated
|
||||
public static class RemoteSolrException extends BaseHttpSolrClient.RemoteSolrException {
|
||||
|
||||
public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
|
||||
super(code, "Error from server at " + remoteHost + ": " + msg, th);
|
||||
super(remoteHost, code, msg, th);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This should be thrown when a server has an error in executing the request and
|
||||
* it sends a proper payload back to the client
|
||||
* @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteExecutionException} instead
|
||||
*/
|
||||
public static class RemoteExecutionException extends RemoteSolrException {
|
||||
private NamedList meta;
|
||||
@Deprecated
|
||||
public static class RemoteExecutionException extends BaseHttpSolrClient.RemoteExecutionException {
|
||||
|
||||
public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
|
||||
super(remoteHost, code, msg, null);
|
||||
this.meta = meta;
|
||||
}
|
||||
|
||||
|
||||
public static RemoteExecutionException create(String host, NamedList errResponse) {
|
||||
Object errObj = errResponse.get("error");
|
||||
if (errObj != null) {
|
||||
Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
|
||||
String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
|
||||
return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
|
||||
msg == null ? "Unknown Error" : msg, errResponse);
|
||||
|
||||
} else {
|
||||
throw new RuntimeException("No error");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public NamedList getMetaData() {
|
||||
|
||||
return meta;
|
||||
super(remoteHost, code, msg, meta);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,8 +31,10 @@ import java.util.Objects;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.LBSolrClient;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
|
@ -236,25 +238,21 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
params.set(UpdateParams.COMMIT, "true");
|
||||
return process(client, collection);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param router to route updates with
|
||||
* @param col DocCollection for the updates
|
||||
* @param urlMap of the cluster
|
||||
* @param params params to use
|
||||
* @param idField the id field
|
||||
* @return a Map of urls to requests
|
||||
*/
|
||||
public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
|
||||
DocCollection col, Map<String,List<String>> urlMap,
|
||||
ModifiableSolrParams params, String idField) {
|
||||
|
||||
|
||||
private interface ReqSupplier<T extends LBSolrClient.Req> {
|
||||
T get(SolrRequest solrRequest, List<String> servers);
|
||||
}
|
||||
|
||||
private <T extends LBSolrClient.Req> Map<String, T> getRoutes(DocRouter router,
|
||||
DocCollection col, Map<String,List<String>> urlMap,
|
||||
ModifiableSolrParams params, String idField,
|
||||
ReqSupplier<T> reqSupplier) {
|
||||
if ((documents == null || documents.size() == 0)
|
||||
&& (deleteById == null || deleteById.size() == 0)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<String,LBHttpSolrClient.Req> routes = new HashMap<>();
|
||||
|
||||
Map<String,T> routes = new HashMap<>();
|
||||
if (documents != null) {
|
||||
Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
|
||||
for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
|
||||
|
@ -273,7 +271,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
return null;
|
||||
}
|
||||
String leaderUrl = urls.get(0);
|
||||
LBHttpSolrClient.Req request = routes
|
||||
T request = routes
|
||||
.get(leaderUrl);
|
||||
if (request == null) {
|
||||
UpdateRequest updateRequest = new UpdateRequest();
|
||||
|
@ -283,7 +281,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
updateRequest.setPath(getPath());
|
||||
updateRequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
|
||||
updateRequest.setResponseParser(getResponseParser());
|
||||
request = new LBHttpSolrClient.Req(updateRequest, urls);
|
||||
request = reqSupplier.get(updateRequest, urls);
|
||||
routes.put(leaderUrl, request);
|
||||
}
|
||||
UpdateRequest urequest = (UpdateRequest) request.getRequest();
|
||||
|
@ -299,17 +297,17 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Route the deleteById's
|
||||
|
||||
|
||||
if (deleteById != null) {
|
||||
|
||||
|
||||
Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
|
||||
.iterator();
|
||||
while (entries.hasNext()) {
|
||||
|
||||
|
||||
Map.Entry<String,Map<String,Object>> entry = entries.next();
|
||||
|
||||
|
||||
String deleteId = entry.getKey();
|
||||
Map<String,Object> map = entry.getValue();
|
||||
Long version = null;
|
||||
|
@ -325,7 +323,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
return null;
|
||||
}
|
||||
String leaderUrl = urls.get(0);
|
||||
LBHttpSolrClient.Req request = routes.get(leaderUrl);
|
||||
T request = routes.get(leaderUrl);
|
||||
if (request != null) {
|
||||
UpdateRequest urequest = (UpdateRequest) request.getRequest();
|
||||
urequest.deleteById(deleteId, version);
|
||||
|
@ -335,7 +333,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
urequest.deleteById(deleteId, version);
|
||||
urequest.setCommitWithin(getCommitWithin());
|
||||
urequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
|
||||
request = new LBHttpSolrClient.Req(urequest, urls);
|
||||
request = reqSupplier.get(urequest, urls);
|
||||
routes.put(leaderUrl, request);
|
||||
}
|
||||
}
|
||||
|
@ -344,6 +342,36 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
return routes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param router to route updates with
|
||||
* @param col DocCollection for the updates
|
||||
* @param urlMap of the cluster
|
||||
* @param params params to use
|
||||
* @param idField the id field
|
||||
* @return a Map of urls to requests
|
||||
*/
|
||||
public Map<String, LBSolrClient.Req> getRoutesToCollection(DocRouter router,
|
||||
DocCollection col, Map<String,List<String>> urlMap,
|
||||
ModifiableSolrParams params, String idField) {
|
||||
return getRoutes(router, col, urlMap, params, idField, LBSolrClient.Req::new);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param router to route updates with
|
||||
* @param col DocCollection for the updates
|
||||
* @param urlMap of the cluster
|
||||
* @param params params to use
|
||||
* @param idField the id field
|
||||
* @return a Map of urls to requests
|
||||
* @deprecated since 8.0, uses {@link #getRoutesToCollection(DocRouter, DocCollection, Map, ModifiableSolrParams, String)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
|
||||
DocCollection col, Map<String,List<String>> urlMap,
|
||||
ModifiableSolrParams params, String idField) {
|
||||
return getRoutes(router, col, urlMap, params, idField, LBHttpSolrClient.Req::new);
|
||||
}
|
||||
|
||||
public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
|
||||
this.docIterator = docIterator;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
|
||||
public class CloudHttp2SolrClientBadInputTest extends SolrCloudTestCase {
|
||||
private static final List<String> NULL_STR_LIST = null;
|
||||
private static final List<String> EMPTY_STR_LIST = new ArrayList<>();
|
||||
private static final String ANY_COLLECTION = "ANY_COLLECTION";
|
||||
private static final int ANY_COMMIT_WITHIN_TIME = -1;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(1)
|
||||
.configure();
|
||||
|
||||
final List<String> solrUrls = new ArrayList<>();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteByIdReportsInvalidIdLists() throws Exception {
|
||||
try (SolrClient client = getCloudHttp2SolrClient(cluster)) {
|
||||
assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
|
||||
client.deleteById(ANY_COLLECTION, NULL_STR_LIST);
|
||||
});
|
||||
assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
|
||||
client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST);
|
||||
});
|
||||
assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
|
||||
client.deleteById(ANY_COLLECTION, NULL_STR_LIST, ANY_COMMIT_WITHIN_TIME);
|
||||
});
|
||||
assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
|
||||
client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST, ANY_COMMIT_WITHIN_TIME);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void assertExceptionThrownWithMessageContaining(Class expectedType, List<String> expectedStrings, LuceneTestCase.ThrowingRunnable runnable) {
|
||||
Throwable thrown = expectThrows(expectedType, runnable);
|
||||
|
||||
if (expectedStrings != null) {
|
||||
for (String expectedString : expectedStrings) {
|
||||
assertThat(thrown.getMessage(), containsString(expectedString));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
public class CloudHttp2SolrClientBuilderTest extends LuceneTestCase {
|
||||
private static final String ANY_CHROOT = "/ANY_CHROOT";
|
||||
private static final String ANY_ZK_HOST = "ANY_ZK_HOST";
|
||||
private static final String ANY_OTHER_ZK_HOST = "ANY_OTHER_ZK_HOST";
|
||||
|
||||
@Test
|
||||
public void testSingleZkHostSpecified() throws IOException {
|
||||
try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT))
|
||||
.build()) {
|
||||
final String clientZkHost = createdClient.getZkHost();
|
||||
|
||||
assertTrue(clientZkHost.contains(ANY_ZK_HOST));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeveralZkHostsSpecifiedSingly() throws IOException {
|
||||
final List<String> zkHostList = new ArrayList<>();
|
||||
zkHostList.add(ANY_ZK_HOST); zkHostList.add(ANY_OTHER_ZK_HOST);
|
||||
try (CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHostList, Optional.of(ANY_CHROOT))
|
||||
.build()) {
|
||||
final String clientZkHost = createdClient.getZkHost();
|
||||
|
||||
assertTrue(clientZkHost.contains(ANY_ZK_HOST));
|
||||
assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeveralZkHostsSpecifiedTogether() throws IOException {
|
||||
final ArrayList<String> zkHosts = new ArrayList<String>();
|
||||
zkHosts.add(ANY_ZK_HOST);
|
||||
zkHosts.add(ANY_OTHER_ZK_HOST);
|
||||
try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHosts, Optional.of(ANY_CHROOT)).build()) {
|
||||
final String clientZkHost = createdClient.getZkHost();
|
||||
|
||||
assertTrue(clientZkHost.contains(ANY_ZK_HOST));
|
||||
assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByDefaultConfiguresClientToSendUpdatesOnlyToShardLeaders() throws IOException {
|
||||
try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
|
||||
assertTrue(createdClient.isUpdatesToLeaders() == true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
|
||||
try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
|
||||
assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
public class CloudHttp2SolrClientMultiConstructorTest extends LuceneTestCase {
|
||||
|
||||
/*
|
||||
* NOTE: If you only include one String argument, it will NOT use the
|
||||
* constructor with the variable argument list, which is the one that
|
||||
* we are testing here.
|
||||
*/
|
||||
Collection<String> hosts;
|
||||
|
||||
@Test
|
||||
// commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
|
||||
public void testZkConnectionStringConstructorWithValidChroot() throws IOException {
|
||||
boolean setOrList = random().nextBoolean();
|
||||
int numOfZKServers = TestUtil.nextInt(random(), 1, 5);
|
||||
boolean withChroot = random().nextBoolean();
|
||||
|
||||
final String chroot = "/mychroot";
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
if(setOrList) {
|
||||
/*
|
||||
A LinkedHashSet is required here for testing, or we can't guarantee
|
||||
the order of entries in the final string.
|
||||
*/
|
||||
hosts = new LinkedHashSet<>();
|
||||
} else {
|
||||
hosts = new ArrayList<>();
|
||||
}
|
||||
|
||||
for(int i=0; i<numOfZKServers; i++) {
|
||||
String ZKString = "host" + i + ":2181";
|
||||
hosts.add(ZKString);
|
||||
sb.append(ZKString);
|
||||
if(i<numOfZKServers -1) sb.append(",");
|
||||
}
|
||||
|
||||
String clientChroot = null;
|
||||
if (withChroot) {
|
||||
sb.append(chroot);
|
||||
clientChroot = "/mychroot";
|
||||
}
|
||||
|
||||
try (CloudHttp2SolrClient client = new CloudHttp2SolrClient.Builder(new ArrayList<>(hosts), Optional.ofNullable(clientChroot)).build()) {
|
||||
assertEquals(sb.toString(), client.getZkHost());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
// commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
|
||||
public void testBadChroot() {
|
||||
final List<String> zkHosts = new ArrayList<>();
|
||||
zkHosts.add("host1:2181");
|
||||
new CloudHttp2SolrClient.Builder(zkHosts, Optional.of("foo")).build();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.TestInjection;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class CloudHttp2SolrClientRetryTest extends SolrCloudTestCase {
|
||||
private static final int NODE_COUNT = 1;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(NODE_COUNT)
|
||||
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetry() throws Exception {
|
||||
String collectionName = "testRetry";
|
||||
try (CloudHttp2SolrClient solrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build()) {
|
||||
CollectionAdminRequest.createCollection(collectionName, 1, 1)
|
||||
.process(solrClient);
|
||||
|
||||
solrClient.add(collectionName, new SolrInputDocument("id", "1"));
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CommonParams.QT, "/admin/metrics");
|
||||
String updateRequestCountKey = "solr.core.testRetry.shard1.replica_n1:UPDATE./update.requestTimes:count";
|
||||
params.set("key", updateRequestCountKey);
|
||||
params.set("indent", "true");
|
||||
|
||||
QueryResponse response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
|
||||
NamedList<Object> namedList = response.getResponse();
|
||||
System.out.println(namedList);
|
||||
NamedList metrics = (NamedList) namedList.get("metrics");
|
||||
assertEquals(1L, metrics.get(updateRequestCountKey));
|
||||
|
||||
TestInjection.failUpdateRequests = "true:100";
|
||||
try {
|
||||
expectThrows(BaseCloudSolrClient.RouteException.class,
|
||||
"Expected an exception on the client when failure is injected during updates", () -> {
|
||||
solrClient.add(collectionName, new SolrInputDocument("id", "2"));
|
||||
});
|
||||
} finally {
|
||||
TestInjection.reset();
|
||||
}
|
||||
|
||||
response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
|
||||
namedList = response.getResponse();
|
||||
System.out.println(namedList);
|
||||
metrics = (NamedList) namedList.get("metrics");
|
||||
assertEquals(2L, metrics.get(updateRequestCountKey));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,978 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.cloud.AbstractDistribZkTestBase;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.handler.admin.ConfigSetsHandler;
|
||||
import org.apache.solr.handler.admin.CoreAdminHandler;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.client.solrj.impl.BaseCloudSolrClient.*;
|
||||
|
||||
|
||||
/**
|
||||
* This test would be faster if we simulated the zk state instead.
|
||||
*/
|
||||
@Slow
|
||||
public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final String COLLECTION = "collection1";
|
||||
private static final String COLLECTION2 = "2nd_collection";
|
||||
|
||||
private static final String id = "id";
|
||||
|
||||
private static final int TIMEOUT = 30;
|
||||
private static final int NODE_COUNT = 3;
|
||||
|
||||
private static CloudHttp2SolrClient httpBasedCloudSolrClient = null;
|
||||
private static CloudHttp2SolrClient zkBasedCloudSolrClient = null;
|
||||
|
||||
@Before
|
||||
public void setupCluster() throws Exception {
|
||||
configureCluster(NODE_COUNT)
|
||||
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
|
||||
.configure();
|
||||
|
||||
final List<String> solrUrls = new ArrayList<>();
|
||||
solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
|
||||
httpBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(solrUrls).build();
|
||||
zkBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build();
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (httpBasedCloudSolrClient != null) {
|
||||
try {
|
||||
httpBasedCloudSolrClient.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
if (zkBasedCloudSolrClient != null) {
|
||||
try {
|
||||
zkBasedCloudSolrClient.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
shutdownCluster();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Randomly return the cluster's ZK based CSC, or HttpClusterProvider based CSC.
|
||||
*/
|
||||
private CloudHttp2SolrClient getRandomClient() {
|
||||
// return random().nextBoolean()? zkBasedCloudSolrClient : httpBasedCloudSolrClient;
|
||||
return httpBasedCloudSolrClient;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParallelUpdateQTime() throws Exception {
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 2);
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
for (int i=0; i<10; i++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", String.valueOf(TestUtil.nextInt(random(), 1000, 1100)));
|
||||
req.add(doc);
|
||||
}
|
||||
UpdateResponse response = req.process(getRandomClient(), COLLECTION);
|
||||
// See SOLR-6547, we just need to ensure that no exception is thrown here
|
||||
assertTrue(response.getQTime() >= 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverwriteOption() throws Exception {
|
||||
|
||||
CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
|
||||
.processAndWait(cluster.getSolrClient(), TIMEOUT);
|
||||
cluster.waitForActiveCollection("overwrite", 1, 1);
|
||||
|
||||
new UpdateRequest()
|
||||
.add("id", "0", "a_t", "hello1")
|
||||
.add("id", "0", "a_t", "hello2")
|
||||
.commit(cluster.getSolrClient(), "overwrite");
|
||||
|
||||
QueryResponse resp = cluster.getSolrClient().query("overwrite", new SolrQuery("*:*"));
|
||||
assertEquals("There should be one document because overwrite=true", 1, resp.getResults().getNumFound());
|
||||
|
||||
new UpdateRequest()
|
||||
.add(new SolrInputDocument(id, "1", "a_t", "hello1"), /* overwrite = */ false)
|
||||
.add(new SolrInputDocument(id, "1", "a_t", "hello2"), false)
|
||||
.commit(cluster.getSolrClient(), "overwrite");
|
||||
|
||||
resp = getRandomClient().query("overwrite", new SolrQuery("*:*"));
|
||||
assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAliasHandling() throws Exception {
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 2);
|
||||
|
||||
CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1).process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(COLLECTION2, 2, 2);
|
||||
|
||||
CloudHttp2SolrClient client = getRandomClient();
|
||||
SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
|
||||
client.add(COLLECTION, doc);
|
||||
client.commit(COLLECTION);
|
||||
CollectionAdminRequest.createAlias("testalias", COLLECTION).process(cluster.getSolrClient());
|
||||
|
||||
SolrInputDocument doc2 = new SolrInputDocument("id", "2", "title_s", "my doc too");
|
||||
client.add(COLLECTION2, doc2);
|
||||
client.commit(COLLECTION2);
|
||||
CollectionAdminRequest.createAlias("testalias2", COLLECTION2).process(cluster.getSolrClient());
|
||||
|
||||
CollectionAdminRequest.createAlias("testaliascombined", COLLECTION + "," + COLLECTION2).process(cluster.getSolrClient());
|
||||
|
||||
// ensure that the aliases have been registered
|
||||
Map<String, String> aliases = new CollectionAdminRequest.ListAliases().process(cluster.getSolrClient()).getAliases();
|
||||
assertEquals(COLLECTION, aliases.get("testalias"));
|
||||
assertEquals(COLLECTION2, aliases.get("testalias2"));
|
||||
assertEquals(COLLECTION + "," + COLLECTION2, aliases.get("testaliascombined"));
|
||||
|
||||
assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
|
||||
assertEquals(1, client.query("testalias", params("q", "*:*")).getResults().getNumFound());
|
||||
|
||||
assertEquals(1, client.query(COLLECTION2, params("q", "*:*")).getResults().getNumFound());
|
||||
assertEquals(1, client.query("testalias2", params("q", "*:*")).getResults().getNumFound());
|
||||
|
||||
assertEquals(2, client.query("testaliascombined", params("q", "*:*")).getResults().getNumFound());
|
||||
|
||||
ModifiableSolrParams paramsWithBothCollections = params("q", "*:*", "collection", COLLECTION + "," + COLLECTION2);
|
||||
assertEquals(2, client.query(null, paramsWithBothCollections).getResults().getNumFound());
|
||||
|
||||
ModifiableSolrParams paramsWithBothAliases = params("q", "*:*", "collection", "testalias,testalias2");
|
||||
assertEquals(2, client.query(null, paramsWithBothAliases).getResults().getNumFound());
|
||||
|
||||
ModifiableSolrParams paramsWithCombinedAlias = params("q", "*:*", "collection", "testaliascombined");
|
||||
assertEquals(2, client.query(null, paramsWithCombinedAlias).getResults().getNumFound());
|
||||
|
||||
ModifiableSolrParams paramsWithMixedCollectionAndAlias = params("q", "*:*", "collection", "testalias," + COLLECTION2);
|
||||
assertEquals(2, client.query(null, paramsWithMixedCollectionAndAlias).getResults().getNumFound());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouting() throws Exception {
|
||||
CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection("routing_collection", 2, 2);
|
||||
|
||||
AbstractUpdateRequest request = new UpdateRequest()
|
||||
.add(id, "0", "a_t", "hello1")
|
||||
.add(id, "2", "a_t", "hello2")
|
||||
.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
|
||||
// Test single threaded routed updates for UpdateRequest
|
||||
NamedList<Object> response = getRandomClient().request(request, "routing_collection");
|
||||
if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
|
||||
checkSingleServer(response);
|
||||
}
|
||||
RouteResponse rr = (RouteResponse) response;
|
||||
Map<String,LBSolrClient.Req> routes = rr.getRoutes();
|
||||
Iterator<Map.Entry<String,LBSolrClient.Req>> it = routes.entrySet()
|
||||
.iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String,LBSolrClient.Req> entry = it.next();
|
||||
String url = entry.getKey();
|
||||
UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
|
||||
.getRequest();
|
||||
SolrInputDocument doc = updateRequest.getDocuments().get(0);
|
||||
String id = doc.getField("id").getValue().toString();
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.add("q", "id:" + id);
|
||||
params.add("distrib", "false");
|
||||
QueryRequest queryRequest = new QueryRequest(params);
|
||||
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
|
||||
QueryResponse queryResponse = queryRequest.process(solrClient);
|
||||
SolrDocumentList docList = queryResponse.getResults();
|
||||
assertTrue(docList.getNumFound() == 1);
|
||||
}
|
||||
}
|
||||
|
||||
// Test the deleteById routing for UpdateRequest
|
||||
|
||||
final UpdateResponse uResponse = new UpdateRequest()
|
||||
.deleteById("0")
|
||||
.deleteById("2")
|
||||
.commit(cluster.getSolrClient(), "routing_collection");
|
||||
if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
|
||||
checkSingleServer(uResponse.getResponse());
|
||||
}
|
||||
|
||||
QueryResponse qResponse = getRandomClient().query("routing_collection", new SolrQuery("*:*"));
|
||||
SolrDocumentList docs = qResponse.getResults();
|
||||
assertEquals(0, docs.getNumFound());
|
||||
|
||||
// Test Multi-Threaded routed updates for UpdateRequest
|
||||
try (CloudSolrClient threadedClient = new CloudSolrClientBuilder
|
||||
(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
|
||||
.withParallelUpdates(true)
|
||||
.build()) {
|
||||
threadedClient.setDefaultCollection("routing_collection");
|
||||
response = threadedClient.request(request);
|
||||
if (threadedClient.isDirectUpdatesToLeadersOnly()) {
|
||||
checkSingleServer(response);
|
||||
}
|
||||
rr = (RouteResponse) response;
|
||||
routes = rr.getRoutes();
|
||||
it = routes.entrySet()
|
||||
.iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String,LBSolrClient.Req> entry = it.next();
|
||||
String url = entry.getKey();
|
||||
UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
|
||||
.getRequest();
|
||||
SolrInputDocument doc = updateRequest.getDocuments().get(0);
|
||||
String id = doc.getField("id").getValue().toString();
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.add("q", "id:" + id);
|
||||
params.add("distrib", "false");
|
||||
QueryRequest queryRequest = new QueryRequest(params);
|
||||
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
|
||||
QueryResponse queryResponse = queryRequest.process(solrClient);
|
||||
SolrDocumentList docList = queryResponse.getResults();
|
||||
assertTrue(docList.getNumFound() == 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test that queries with _route_ params are routed by the client
|
||||
|
||||
// Track request counts on each node before query calls
|
||||
ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
|
||||
DocCollection col = clusterState.getCollection("routing_collection");
|
||||
Map<String, Long> requestCountsMap = Maps.newHashMap();
|
||||
for (Slice slice : col.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
|
||||
requestCountsMap.put(baseURL, getNumRequests(baseURL, "routing_collection"));
|
||||
}
|
||||
}
|
||||
|
||||
// Collect the base URLs of the replicas of shard that's expected to be hit
|
||||
DocRouter router = col.getRouter();
|
||||
Collection<Slice> expectedSlices = router.getSearchSlicesSingle("0", null, col);
|
||||
Set<String> expectedBaseURLs = Sets.newHashSet();
|
||||
for (Slice expectedSlice : expectedSlices) {
|
||||
for (Replica replica : expectedSlice.getReplicas()) {
|
||||
String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
|
||||
expectedBaseURLs.add(baseURL);
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("expected urls is not fewer than all urls! expected=" + expectedBaseURLs
|
||||
+ "; all=" + requestCountsMap.keySet(),
|
||||
expectedBaseURLs.size() < requestCountsMap.size());
|
||||
|
||||
// Calculate a number of shard keys that route to the same shard.
|
||||
int n;
|
||||
if (TEST_NIGHTLY) {
|
||||
n = random().nextInt(999) + 2;
|
||||
} else {
|
||||
n = random().nextInt(9) + 2;
|
||||
}
|
||||
|
||||
List<String> sameShardRoutes = Lists.newArrayList();
|
||||
sameShardRoutes.add("0");
|
||||
for (int i = 1; i < n; i++) {
|
||||
String shardKey = Integer.toString(i);
|
||||
Collection<Slice> slices = router.getSearchSlicesSingle(shardKey, null, col);
|
||||
log.info("Expected Slices {}", slices);
|
||||
if (expectedSlices.equals(slices)) {
|
||||
sameShardRoutes.add(shardKey);
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(sameShardRoutes.size() > 1);
|
||||
|
||||
// Do N queries with _route_ parameter to the same shard
|
||||
for (int i = 0; i < n; i++) {
|
||||
ModifiableSolrParams solrParams = new ModifiableSolrParams();
|
||||
solrParams.set(CommonParams.Q, "*:*");
|
||||
solrParams.set(ShardParams._ROUTE_, sameShardRoutes.get(random().nextInt(sameShardRoutes.size())));
|
||||
log.info("output: {}", getRandomClient().query("routing_collection", solrParams));
|
||||
}
|
||||
|
||||
// Request counts increase from expected nodes should aggregate to 1000, while there should be
|
||||
// no increase in unexpected nodes.
|
||||
int increaseFromExpectedUrls = 0;
|
||||
int increaseFromUnexpectedUrls = 0;
|
||||
Map<String, Long> numRequestsToUnexpectedUrls = Maps.newHashMap();
|
||||
for (Slice slice : col.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
|
||||
|
||||
Long prevNumRequests = requestCountsMap.get(baseURL);
|
||||
Long curNumRequests = getNumRequests(baseURL, "routing_collection");
|
||||
|
||||
long delta = curNumRequests - prevNumRequests;
|
||||
if (expectedBaseURLs.contains(baseURL)) {
|
||||
increaseFromExpectedUrls += delta;
|
||||
} else {
|
||||
increaseFromUnexpectedUrls += delta;
|
||||
numRequestsToUnexpectedUrls.put(baseURL, delta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals("Unexpected number of requests to expected URLs", n, increaseFromExpectedUrls);
|
||||
assertEquals("Unexpected number of requests to unexpected URLs: " + numRequestsToUnexpectedUrls,
|
||||
0, increaseFromUnexpectedUrls);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if the specification of 'preferLocalShards' in the query-params
|
||||
* limits the distributed query to locally hosted shards only
|
||||
*/
|
||||
@Test
|
||||
// commented 4-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
|
||||
public void preferLocalShardsTest() throws Exception {
|
||||
|
||||
String collectionName = "localShardsTestColl";
|
||||
|
||||
int liveNodes = cluster.getJettySolrRunners().size();
|
||||
|
||||
// For preferLocalShards to succeed in a test, every shard should have
|
||||
// all its cores on the same node.
|
||||
// Hence the below configuration for our collection
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
|
||||
.setMaxShardsPerNode(liveNodes * liveNodes)
|
||||
.processAndWait(cluster.getSolrClient(), TIMEOUT);
|
||||
cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * liveNodes);
|
||||
// Add some new documents
|
||||
new UpdateRequest()
|
||||
.add(id, "0", "a_t", "hello1")
|
||||
.add(id, "2", "a_t", "hello2")
|
||||
.add(id, "3", "a_t", "hello2")
|
||||
.commit(getRandomClient(), collectionName);
|
||||
|
||||
// Run the actual test for 'preferLocalShards'
|
||||
queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
|
||||
queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void queryWithShardsPreferenceRules(CloudHttp2SolrClient cloudClient,
|
||||
boolean useShardsPreference,
|
||||
String collectionName)
|
||||
throws Exception
|
||||
{
|
||||
SolrQuery qRequest = new SolrQuery("*:*");
|
||||
|
||||
ModifiableSolrParams qParams = new ModifiableSolrParams();
|
||||
if (useShardsPreference) {
|
||||
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
|
||||
} else {
|
||||
qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
|
||||
}
|
||||
qParams.add(ShardParams.SHARDS_INFO, "true");
|
||||
qRequest.add(qParams);
|
||||
|
||||
// CloudSolrClient sends the request to some node.
|
||||
// And since all the nodes are hosting cores from all shards, the
|
||||
// distributed query formed by this node will select cores from the
|
||||
// local shards only
|
||||
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
|
||||
|
||||
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
|
||||
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
|
||||
|
||||
// Iterate over shards-info and check what cores responded
|
||||
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
|
||||
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
|
||||
List<String> shardAddresses = new ArrayList<String>();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<String, ?> e = itr.next();
|
||||
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
|
||||
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
|
||||
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
|
||||
shardAddresses.add(shardAddress);
|
||||
}
|
||||
log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
|
||||
|
||||
// Make sure the distributed queries were directed to a single node only
|
||||
Set<Integer> ports = new HashSet<Integer>();
|
||||
for (String shardAddr: shardAddresses) {
|
||||
URL url = new URL (shardAddr);
|
||||
ports.add(url.getPort());
|
||||
}
|
||||
|
||||
// This assertion would hold true as long as every shard has a core on each node
|
||||
assertTrue ("Response was not received from shards on a single node",
|
||||
shardAddresses.size() > 1 && ports.size()==1);
|
||||
}
|
||||
|
||||
private Long getNumRequests(String baseUrl, String collectionName) throws
|
||||
SolrServerException, IOException {
|
||||
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
|
||||
}
|
||||
|
||||
private Long getNumRequests(String baseUrl, String collectionName, String category, String key, String scope, boolean returnNumErrors) throws
|
||||
SolrServerException, IOException {
|
||||
|
||||
NamedList<Object> resp;
|
||||
try (HttpSolrClient client = getHttpSolrClient(baseUrl + "/"+ collectionName, 15000, 60000)) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("qt", "/admin/mbeans");
|
||||
params.set("stats", "true");
|
||||
params.set("key", key);
|
||||
params.set("cat", category);
|
||||
// use generic request to avoid extra processing of queries
|
||||
QueryRequest req = new QueryRequest(params);
|
||||
resp = client.request(req);
|
||||
}
|
||||
String name;
|
||||
if (returnNumErrors) {
|
||||
name = category + "." + (scope != null ? scope : key) + ".errors";
|
||||
} else {
|
||||
name = category + "." + (scope != null ? scope : key) + ".requests";
|
||||
}
|
||||
Map<String,Object> map = (Map<String,Object>)resp.findRecursive("solr-mbeans", category, key, "stats");
|
||||
if (map == null) {
|
||||
return null;
|
||||
}
|
||||
if (scope != null) { // admin handler uses a meter instead of counter here
|
||||
return (Long)map.get(name + ".count");
|
||||
} else {
|
||||
return (Long) map.get(name);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonRetryableRequests() throws Exception {
|
||||
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
|
||||
// important to have one replica on each node
|
||||
RequestStatusState state = CollectionAdminRequest.createCollection("foo", "conf", 1, NODE_COUNT).processAndWait(client, 60);
|
||||
if (state == RequestStatusState.COMPLETED) {
|
||||
cluster.waitForActiveCollection("foo", 1, NODE_COUNT);
|
||||
client.setDefaultCollection("foo");
|
||||
|
||||
Map<String, String> adminPathToMbean = new HashMap<>(CommonParams.ADMIN_PATHS.size());
|
||||
adminPathToMbean.put(CommonParams.COLLECTIONS_HANDLER_PATH, CollectionsHandler.class.getName());
|
||||
adminPathToMbean.put(CommonParams.CORES_HANDLER_PATH, CoreAdminHandler.class.getName());
|
||||
adminPathToMbean.put(CommonParams.CONFIGSETS_HANDLER_PATH, ConfigSetsHandler.class.getName());
|
||||
// we do not add the authc/authz handlers because they do not currently expose any mbeans
|
||||
|
||||
for (String adminPath : adminPathToMbean.keySet()) {
|
||||
long errorsBefore = 0;
|
||||
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
|
||||
Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
|
||||
errorsBefore += numRequests;
|
||||
log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
|
||||
}
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("qt", adminPath);
|
||||
params.set("action", "foobar"); // this should cause an error
|
||||
QueryRequest req = new QueryRequest(params);
|
||||
try {
|
||||
NamedList<Object> resp = client.request(req);
|
||||
fail("call to foo for admin path " + adminPath + " should have failed");
|
||||
} catch (Exception e) {
|
||||
// expected
|
||||
}
|
||||
long errorsAfter = 0;
|
||||
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
|
||||
Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
|
||||
errorsAfter += numRequests;
|
||||
log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
|
||||
}
|
||||
assertEquals(errorsBefore + 1, errorsAfter);
|
||||
}
|
||||
} else {
|
||||
fail("Collection could not be created within 60 seconds");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkCollectionParameters() throws Exception {
|
||||
|
||||
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
|
||||
|
||||
String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
|
||||
.processAsync(client);
|
||||
String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
|
||||
.processAsync(client);
|
||||
|
||||
CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
|
||||
CollectionAdminRequest.waitForAsyncRequest(async2, client, TIMEOUT);
|
||||
cluster.waitForActiveCollection("multicollection1", 2, 2);
|
||||
cluster.waitForActiveCollection("multicollection2", 2, 2);
|
||||
client.setDefaultCollection("multicollection1");
|
||||
|
||||
List<SolrInputDocument> docs = new ArrayList<>(3);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField(id, Integer.toString(i));
|
||||
doc.addField("a_t", "hello");
|
||||
docs.add(doc);
|
||||
}
|
||||
|
||||
client.add(docs); // default - will add them to multicollection1
|
||||
client.commit();
|
||||
|
||||
ModifiableSolrParams queryParams = new ModifiableSolrParams();
|
||||
queryParams.add("q", "*:*");
|
||||
assertEquals(3, client.query(queryParams).getResults().size());
|
||||
assertEquals(0, client.query("multicollection2", queryParams).getResults().size());
|
||||
|
||||
SolrQuery query = new SolrQuery("*:*");
|
||||
query.set("collection", "multicollection2");
|
||||
assertEquals(0, client.query(query).getResults().size());
|
||||
|
||||
client.add("multicollection2", docs);
|
||||
client.commit("multicollection2");
|
||||
|
||||
assertEquals(3, client.query("multicollection2", queryParams).getResults().size());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void stateVersionParamTest() throws Exception {
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 2);
|
||||
|
||||
DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
|
||||
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
|
||||
|
||||
SolrQuery q = new SolrQuery().setQuery("*:*");
|
||||
HttpSolrClient.RemoteSolrException sse = null;
|
||||
|
||||
final String url = r.getStr(ZkStateReader.BASE_URL_PROP) + "/" + COLLECTION;
|
||||
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
|
||||
|
||||
log.info("should work query, result {}", solrClient.query(q));
|
||||
//no problem
|
||||
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + coll.getZNodeVersion());
|
||||
log.info("2nd query , result {}", solrClient.query(q));
|
||||
//no error yet good
|
||||
|
||||
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
|
||||
|
||||
QueryResponse rsp = solrClient.query(q);
|
||||
Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
|
||||
assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
|
||||
assertNotNull(m.get(COLLECTION));
|
||||
}
|
||||
|
||||
//now send the request to another node that does not serve the collection
|
||||
|
||||
Set<String> allNodesOfColl = new HashSet<>();
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
|
||||
}
|
||||
}
|
||||
String theNode = null;
|
||||
Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
|
||||
for (String s : liveNodes) {
|
||||
String n = cluster.getSolrClient().getZkStateReader().getBaseUrlForNodeName(s);
|
||||
if(!allNodesOfColl.contains(n)){
|
||||
theNode = n;
|
||||
break;
|
||||
}
|
||||
}
|
||||
log.info("the node which does not serve this collection{} ",theNode);
|
||||
assertNotNull(theNode);
|
||||
|
||||
|
||||
final String solrClientUrl = theNode + "/" + COLLECTION;
|
||||
try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
|
||||
|
||||
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion()-1));
|
||||
try {
|
||||
QueryResponse rsp = solrClient.query(q);
|
||||
log.info("error was expected");
|
||||
} catch (HttpSolrClient.RemoteSolrException e) {
|
||||
sse = e;
|
||||
}
|
||||
assertNotNull(sse);
|
||||
assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdown() throws IOException {
|
||||
try (CloudSolrClient client = getCloudSolrClient("[ff01::114]:33332")) {
|
||||
client.setZkConnectTimeout(100);
|
||||
client.connect();
|
||||
fail("Expected exception");
|
||||
} catch (SolrException e) {
|
||||
assertTrue(e.getCause() instanceof TimeoutException);
|
||||
}
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testWrongZkChrootTest() throws IOException {
|
||||
|
||||
exception.expect(SolrException.class);
|
||||
exception.expectMessage("cluster not found/not ready");
|
||||
|
||||
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress() + "/xyz/foo")) {
|
||||
client.setZkClientTimeout(1000 * 60);
|
||||
client.connect();
|
||||
fail("Expected exception");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customHttpClientTest() throws IOException {
|
||||
CloseableHttpClient client = HttpClientUtil.createClient(null);
|
||||
try (CloudSolrClient solrClient = getCloudSolrClient(cluster.getZkServer().getZkAddress(), client)) {
|
||||
|
||||
assertTrue(solrClient.getLbClient().getHttpClient() == client);
|
||||
|
||||
} finally {
|
||||
HttpClientUtil.close(client);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVersionsAreReturned() throws Exception {
|
||||
CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1).process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection("versions_collection", 2, 2);
|
||||
|
||||
// assert that "adds" are returned
|
||||
UpdateRequest updateRequest = new UpdateRequest()
|
||||
.add("id", "1", "a_t", "hello1")
|
||||
.add("id", "2", "a_t", "hello2");
|
||||
updateRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
|
||||
|
||||
NamedList<Object> response = updateRequest.commit(getRandomClient(), "versions_collection").getResponse();
|
||||
Object addsObject = response.get("adds");
|
||||
|
||||
assertNotNull("There must be a adds parameter", addsObject);
|
||||
assertTrue(addsObject instanceof NamedList<?>);
|
||||
NamedList<?> adds = (NamedList<?>) addsObject;
|
||||
assertEquals("There must be 2 versions (one per doc)", 2, adds.size());
|
||||
|
||||
Map<String, Long> versions = new HashMap<>();
|
||||
Object object = adds.get("1");
|
||||
assertNotNull("There must be a version for id 1", object);
|
||||
assertTrue("Version for id 1 must be a long", object instanceof Long);
|
||||
versions.put("1", (Long) object);
|
||||
|
||||
object = adds.get("2");
|
||||
assertNotNull("There must be a version for id 2", object);
|
||||
assertTrue("Version for id 2 must be a long", object instanceof Long);
|
||||
versions.put("2", (Long) object);
|
||||
|
||||
QueryResponse resp = getRandomClient().query("versions_collection", new SolrQuery("*:*"));
|
||||
assertEquals("There should be one document because overwrite=true", 2, resp.getResults().getNumFound());
|
||||
|
||||
for (SolrDocument doc : resp.getResults()) {
|
||||
Long version = versions.get(doc.getFieldValue("id"));
|
||||
assertEquals("Version on add must match _version_ field", version, doc.getFieldValue("_version_"));
|
||||
}
|
||||
|
||||
// assert that "deletes" are returned
|
||||
UpdateRequest deleteRequest = new UpdateRequest().deleteById("1");
|
||||
deleteRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
|
||||
response = deleteRequest.commit(getRandomClient(), "versions_collection").getResponse();
|
||||
Object deletesObject = response.get("deletes");
|
||||
assertNotNull("There must be a deletes parameter", deletesObject);
|
||||
NamedList deletes = (NamedList) deletesObject;
|
||||
assertEquals("There must be 1 version", 1, deletes.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitializationWithSolrUrls() throws Exception {
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 2);
|
||||
CloudHttp2SolrClient client = httpBasedCloudSolrClient;
|
||||
SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
|
||||
client.add(COLLECTION, doc);
|
||||
client.commit(COLLECTION);
|
||||
assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCollectionDoesntExist() throws Exception {
|
||||
CloudHttp2SolrClient client = getRandomClient();
|
||||
SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
|
||||
try {
|
||||
client.add("boguscollectionname", doc);
|
||||
fail();
|
||||
} catch (SolrException ex) {
|
||||
if (ex.getMessage().equals("Collection not found: boguscollectionname")) {
|
||||
// pass
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRetryUpdatesWhenClusterStateIsStale() throws Exception {
|
||||
final String COL = "stale_state_test_col";
|
||||
assert cluster.getJettySolrRunners().size() >= 2;
|
||||
|
||||
final JettySolrRunner old_leader_node = cluster.getJettySolrRunners().get(0);
|
||||
final JettySolrRunner new_leader_node = cluster.getJettySolrRunners().get(1);
|
||||
|
||||
// start with exactly 1 shard/replica...
|
||||
assertEquals("Couldn't create collection", 0,
|
||||
CollectionAdminRequest.createCollection(COL, "conf", 1, 1)
|
||||
.setCreateNodeSet(old_leader_node.getNodeName())
|
||||
.process(cluster.getSolrClient()).getStatus());
|
||||
cluster.waitForActiveCollection(COL, 1, 1);
|
||||
|
||||
// determine the coreNodeName of only current replica
|
||||
Collection<Slice> slices = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COL).getSlices();
|
||||
assertEquals(1, slices.size()); // sanity check
|
||||
Slice slice = slices.iterator().next();
|
||||
assertEquals(1, slice.getReplicas().size()); // sanity check
|
||||
final String old_leader_core_node_name = slice.getLeader().getName();
|
||||
|
||||
// NOTE: creating our own CloudSolrClient whose settings we can muck with...
|
||||
try (CloudSolrClient stale_client = new CloudSolrClientBuilder
|
||||
(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
|
||||
.sendDirectUpdatesToAnyShardReplica()
|
||||
.withParallelUpdates(true)
|
||||
.build()) {
|
||||
// don't let collection cache entries get expired, even on a slow machine...
|
||||
stale_client.setCollectionCacheTTl(Integer.MAX_VALUE);
|
||||
stale_client.setDefaultCollection(COL);
|
||||
|
||||
// do a query to populate stale_client's cache...
|
||||
assertEquals(0, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
|
||||
// add 1 replica on a diff node...
|
||||
assertEquals("Couldn't create collection", 0,
|
||||
CollectionAdminRequest.addReplicaToShard(COL, "shard1")
|
||||
.setNode(new_leader_node.getNodeName())
|
||||
// NOTE: don't use our stale_client for this -- don't tip it off of a collection change
|
||||
.process(cluster.getSolrClient()).getStatus());
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish
|
||||
(COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
|
||||
// ...and delete our original leader.
|
||||
assertEquals("Couldn't create collection", 0,
|
||||
CollectionAdminRequest.deleteReplica(COL, "shard1", old_leader_core_node_name)
|
||||
// NOTE: don't use our stale_client for this -- don't tip it off of a collection change
|
||||
.process(cluster.getSolrClient()).getStatus());
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish
|
||||
(COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
|
||||
|
||||
// stale_client's collection state cache should now only point at a leader that no longer exists.
|
||||
|
||||
// attempt a (direct) update that should succeed in spite of cached cluster state
|
||||
// pointing solely to a node that's no longer part of our collection...
|
||||
assertEquals(0, (new UpdateRequest().add("id", "1").commit(stale_client, COL)).getStatus());
|
||||
assertEquals(1, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void checkSingleServer(NamedList<Object> response) {
|
||||
final RouteResponse rr = (RouteResponse) response;
|
||||
final Map<String,LBSolrClient.Req> routes = rr.getRoutes();
|
||||
final Iterator<Map.Entry<String,LBSolrClient.Req>> it =
|
||||
routes.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String,LBSolrClient.Req> entry = it.next();
|
||||
assertEquals("wrong number of servers: "+entry.getValue().getServers(),
|
||||
1, entry.getValue().getServers().size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if the specification of 'preferReplicaTypes` in the query-params
|
||||
* limits the distributed query to locally hosted shards only
|
||||
*/
|
||||
@Test
|
||||
// commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
|
||||
public void preferReplicaTypesTest() throws Exception {
|
||||
|
||||
String collectionName = "replicaTypesTestColl";
|
||||
|
||||
int liveNodes = cluster.getJettySolrRunners().size();
|
||||
|
||||
// For these tests we need to have multiple replica types.
|
||||
// Hence the below configuration for our collection
|
||||
int pullReplicas = Math.max(1, liveNodes - 2);
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, pullReplicas)
|
||||
.setMaxShardsPerNode(liveNodes)
|
||||
.processAndWait(cluster.getSolrClient(), TIMEOUT);
|
||||
cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * (2 + pullReplicas));
|
||||
|
||||
// Add some new documents
|
||||
new UpdateRequest()
|
||||
.add(id, "0", "a_t", "hello1")
|
||||
.add(id, "2", "a_t", "hello2")
|
||||
.add(id, "3", "a_t", "hello2")
|
||||
.commit(getRandomClient(), collectionName);
|
||||
|
||||
// Run the actual tests for 'shards.preference=replica.type:*'
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
|
||||
// Test to verify that preferLocalShards=true doesn't break this
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
|
||||
}
|
||||
|
||||
private void queryWithPreferReplicaTypes(CloudHttp2SolrClient cloudClient,
|
||||
String preferReplicaTypes,
|
||||
boolean preferLocalShards,
|
||||
String collectionName)
|
||||
throws Exception
|
||||
{
|
||||
SolrQuery qRequest = new SolrQuery("*:*");
|
||||
ModifiableSolrParams qParams = new ModifiableSolrParams();
|
||||
|
||||
final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
|
||||
StringBuilder rule = new StringBuilder();
|
||||
preferredTypes.forEach(type -> {
|
||||
if (rule.length() != 0) {
|
||||
rule.append(',');
|
||||
}
|
||||
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
|
||||
rule.append(':');
|
||||
rule.append(type);
|
||||
});
|
||||
if (preferLocalShards) {
|
||||
if (rule.length() != 0) {
|
||||
rule.append(',');
|
||||
}
|
||||
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
|
||||
rule.append(":local");
|
||||
}
|
||||
qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());
|
||||
qParams.add(ShardParams.SHARDS_INFO, "true");
|
||||
qRequest.add(qParams);
|
||||
|
||||
// CloudSolrClient sends the request to some node.
|
||||
// And since all the nodes are hosting cores from all shards, the
|
||||
// distributed query formed by this node will select cores from the
|
||||
// local shards only
|
||||
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
|
||||
|
||||
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
|
||||
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
|
||||
|
||||
Map<String, String> replicaTypeMap = new HashMap<>();
|
||||
DocCollection collection = getCollectionState(collectionName);
|
||||
for (Slice slice : collection.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
String coreUrl = replica.getCoreUrl();
|
||||
// It seems replica reports its core URL with a trailing slash while shard
|
||||
// info returned from the query doesn't. Oh well.
|
||||
if (coreUrl.endsWith("/")) {
|
||||
coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
|
||||
}
|
||||
replicaTypeMap.put(coreUrl, replica.getType().toString());
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate over shards-info and check that replicas of correct type responded
|
||||
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
|
||||
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
|
||||
List<String> shardAddresses = new ArrayList<String>();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<String, ?> e = itr.next();
|
||||
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
|
||||
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
|
||||
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
|
||||
assertTrue(replicaTypeMap.containsKey(shardAddress));
|
||||
assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
|
||||
shardAddresses.add(shardAddress);
|
||||
}
|
||||
assertTrue("No responses", shardAddresses.size() > 0);
|
||||
log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
|
||||
}
|
||||
|
||||
}
|
|
@ -108,7 +108,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
|
|||
private LBHttpSolrClient getMockLbHttpSolrClient(Map<String, Function> responses) throws Exception {
|
||||
LBHttpSolrClient mockLbclient = mock(LBHttpSolrClient.class);
|
||||
|
||||
when(mockLbclient.request(any(LBHttpSolrClient.Req.class))).then(invocationOnMock -> {
|
||||
when(mockLbclient.request(any(LBSolrClient.Req.class))).then(invocationOnMock -> {
|
||||
LBHttpSolrClient.Req req = invocationOnMock.getArgument(0);
|
||||
Function f = responses.get("request");
|
||||
if (f == null) return null;
|
||||
|
|
|
@ -86,6 +86,7 @@ import org.apache.lucene.util.TestUtil;
|
|||
import org.apache.solr.client.solrj.ResponseParser;
|
||||
import org.apache.solr.client.solrj.embedded.JettyConfig;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
|
||||
|
@ -2279,6 +2280,61 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
return (0 == TestUtil.nextInt(random(), 0, 9)) ? unlikely : likely;
|
||||
}
|
||||
|
||||
/**
|
||||
* A variant of {@link org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} that will randomize
|
||||
* some internal settings.
|
||||
*/
|
||||
public static class CloudHttp2SolrClientBuilder extends CloudHttp2SolrClient.Builder {
|
||||
|
||||
public CloudHttp2SolrClientBuilder(List<String> zkHosts, Optional<String> zkChroot) {
|
||||
super(zkHosts, zkChroot);
|
||||
randomizeCloudSolrClient();
|
||||
}
|
||||
|
||||
public CloudHttp2SolrClientBuilder(ClusterStateProvider stateProvider) {
|
||||
super(new ArrayList<>());
|
||||
this.stateProvider = stateProvider;
|
||||
randomizeCloudSolrClient();
|
||||
}
|
||||
|
||||
public CloudHttp2SolrClientBuilder(MiniSolrCloudCluster cluster) {
|
||||
super(new ArrayList<>());
|
||||
if (random().nextBoolean()) {
|
||||
this.zkHosts.add(cluster.getZkServer().getZkAddress());
|
||||
} else {
|
||||
populateSolrUrls(cluster);
|
||||
}
|
||||
|
||||
randomizeCloudSolrClient();
|
||||
}
|
||||
|
||||
private void populateSolrUrls(MiniSolrCloudCluster cluster) {
|
||||
if (random().nextBoolean()) {
|
||||
final List<JettySolrRunner> solrNodes = cluster.getJettySolrRunners();
|
||||
for (JettySolrRunner node : solrNodes) {
|
||||
this.solrUrls.add(node.getBaseUrl().toString());
|
||||
}
|
||||
} else {
|
||||
this.solrUrls.add(cluster.getRandomJetty(random()).getBaseUrl().toString());
|
||||
}
|
||||
}
|
||||
|
||||
private void randomizeCloudSolrClient() {
|
||||
this.directUpdatesToLeadersOnly = random().nextBoolean();
|
||||
this.shardLeadersOnly = random().nextBoolean();
|
||||
this.parallelUpdates = random().nextBoolean();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method <i>may</i> randomize unspecified aspects of the resulting SolrClient.
|
||||
* Tests that do not wish to have any randomized behavior should use the
|
||||
* {@link org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} class directly
|
||||
*/
|
||||
public static CloudHttp2SolrClient getCloudHttp2SolrClient(MiniSolrCloudCluster cluster) {
|
||||
return new CloudHttp2SolrClientBuilder(cluster).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* A variant of {@link org.apache.solr.client.solrj.impl.CloudSolrClient.Builder} that will randomize
|
||||
* some internal settings.
|
||||
|
|
Loading…
Reference in New Issue