SOLR-9784: Refactor CloudSolrClient to eliminate direct dependency on ZK

SOLR-9512: CloudSolrClient's cluster state cache can break direct updates to leaders
This commit is contained in:
Noble Paul 2016-11-25 00:27:16 +05:30
parent eb51ebc0e9
commit e309f90589
7 changed files with 595 additions and 120 deletions

View File

@ -143,6 +143,8 @@ Bug Fixes
* SOLR-9626: new Admin UI now also highlights matched terms in the Analysis screen. (Alexandre Rafalovitch) * SOLR-9626: new Admin UI now also highlights matched terms in the Analysis screen. (Alexandre Rafalovitch)
* SOLR-9512: CloudSolrClient's cluster state cache can break direct updates to leaders (noble)
Other Changes Other Changes
---------------------- ----------------------
@ -168,6 +170,8 @@ Other Changes
* SOLR-8785: Use Dropwizard Metrics library for core metrics. The copied over code in * SOLR-8785: Use Dropwizard Metrics library for core metrics. The copied over code in
org.apache.solr.util.stats has been removed. (Jeff Wartes, Kelvin Wong, Christine Poerschke, shalin) org.apache.solr.util.stats has been removed. (Jeff Wartes, Kelvin Wong, Christine Poerschke, shalin)
* SOLR-9784: Refactor CloudSolrClient to eliminate direct dependency on ZK (noble)
================== 6.3.0 ================== ================== 6.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -40,6 +40,10 @@
<dependency org="org.slf4j" name="jcl-over-slf4j" rev="${/org.slf4j/jcl-over-slf4j}" conf="compile"/> <dependency org="org.slf4j" name="jcl-over-slf4j" rev="${/org.slf4j/jcl-over-slf4j}" conf="compile"/>
<dependency org="org.slf4j" name="slf4j-log4j12" rev="${/org.slf4j/slf4j-log4j12}" conf="test"/> <dependency org="org.slf4j" name="slf4j-log4j12" rev="${/org.slf4j/slf4j-log4j12}" conf="test"/>
<dependency org="org.easymock" name="easymock" rev="${/org.easymock/easymock}" conf="test"/>
<dependency org="cglib" name="cglib-nodep" rev="${/cglib/cglib-nodep}" conf="test"/>
<dependency org="org.objenesis" name="objenesis" rev="${/org.objenesis/objenesis}" conf="test"/>
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/> <exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies> </dependencies>

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.solr.client.solrj.impl; package org.apache.solr.client.solrj.impl;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.net.ConnectException; import java.net.ConnectException;
@ -37,6 +38,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -56,7 +58,6 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.ToleratedUpdateError; import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate; import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.CollectionStateWatcher;
@ -68,7 +69,6 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
@ -79,7 +79,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
@ -101,10 +100,7 @@ public class CloudSolrClient extends SolrClient {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private volatile ZkStateReader zkStateReader; private final ClusterStateProvider stateProvider;
private String zkHost; // the zk server connect string
private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000;
private volatile String defaultCollection; private volatile String defaultCollection;
private final LBHttpSolrClient lbClient; private final LBHttpSolrClient lbClient;
private final boolean shutdownLBHttpSolrServer; private final boolean shutdownLBHttpSolrServer;
@ -122,6 +118,7 @@ public class CloudSolrClient extends SolrClient {
"CloudSolrClient ThreadPool")); "CloudSolrClient ThreadPool"));
private String idField = "id"; private String idField = "id";
public static final String STATE_VERSION = "_stateVer_"; public static final String STATE_VERSION = "_stateVer_";
private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
private final Set<String> NON_ROUTABLE_PARAMS; private final Set<String> NON_ROUTABLE_PARAMS;
{ {
NON_ROUTABLE_PARAMS = new HashSet<>(); NON_ROUTABLE_PARAMS = new HashSet<>();
@ -139,12 +136,15 @@ public class CloudSolrClient extends SolrClient {
// NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK); // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
} }
private volatile long timeToLive = 60* 1000L;
private volatile List<Object> locks = objectList(3); private volatile List<Object> locks = objectList(3);
protected final Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){ static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
final AtomicLong puts = new AtomicLong();
final AtomicLong hits = new AtomicLong();
final Lock evictLock = new ReentrantLock(true); final Lock evictLock = new ReentrantLock(true);
private volatile long timeToLive = 60 * 1000L;
@Override @Override
public ExpiringCachedDocCollection get(Object key) { public ExpiringCachedDocCollection get(Object key) {
ExpiringCachedDocCollection val = super.get(key); ExpiringCachedDocCollection val = super.get(key);
@ -158,9 +158,16 @@ public class CloudSolrClient extends SolrClient {
super.remove(key); super.remove(key);
return null; return null;
} }
hits.incrementAndGet();
return val; return val;
} }
@Override
public ExpiringCachedDocCollection put(String key, ExpiringCachedDocCollection value) {
puts.incrementAndGet();
return super.put(key, value);
}
void evictStale() { void evictStale() {
if(!evictLock.tryLock()) return; if(!evictLock.tryLock()) return;
try { try {
@ -174,15 +181,30 @@ public class CloudSolrClient extends SolrClient {
} }
} }
}; }
/**
* This is the time to wait to refetch the state
* after getting the same state version from ZK
* <p>
* secs
*/
public void setRetryExpiryTime(int secs) {
this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
}
public void setSoTimeout(int timeout) { public void setSoTimeout(int timeout) {
lbClient.setSoTimeout(timeout); lbClient.setSoTimeout(timeout);
} }
protected final StateCache collectionStateCache = new StateCache();
class ExpiringCachedDocCollection { class ExpiringCachedDocCollection {
final DocCollection cached; final DocCollection cached;
long cachedAt; final long cachedAt;
//This is the time at which the collection is retried and got the same old version
long retriedAt = -1;
//flag that suggests that this is potentially to be rechecked
boolean maybeStale = false;
ExpiringCachedDocCollection(DocCollection cached) { ExpiringCachedDocCollection(DocCollection cached) {
this.cached = cached; this.cached = cached;
@ -193,6 +215,21 @@ public class CloudSolrClient extends SolrClient {
return (System.nanoTime() - cachedAt) return (System.nanoTime() - cachedAt)
> TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS); > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
} }
boolean shoulRetry() {
if (maybeStale) {// we are not sure if it is stale so check with retry time
if ((retriedAt == -1 ||
(System.nanoTime() - retriedAt) > retryExpiryTime)) {
return true;// we retried a while back. and we could not get anything new.
//it's likely that it is not going to be available now also.
}
}
return false;
}
void setRetriedAt() {
retriedAt = System.nanoTime();
}
} }
/** /**
@ -219,7 +256,7 @@ public class CloudSolrClient extends SolrClient {
*/ */
@Deprecated @Deprecated
public CloudSolrClient(String zkHost) { public CloudSolrClient(String zkHost) {
this.zkHost = zkHost; this.stateProvider = new ZkClientClusterStateProvider(zkHost);
this.clientIsInternal = true; this.clientIsInternal = true;
this.myClient = HttpClientUtil.createClient(null); this.myClient = HttpClientUtil.createClient(null);
this.lbClient = new LBHttpSolrClient.Builder() this.lbClient = new LBHttpSolrClient.Builder()
@ -259,8 +296,8 @@ public class CloudSolrClient extends SolrClient {
* @deprecated use {@link Builder} instead. * @deprecated use {@link Builder} instead.
*/ */
@Deprecated @Deprecated
public CloudSolrClient(String zkHost, HttpClient httpClient) { public CloudSolrClient(String zkHost, HttpClient httpClient) {
this.zkHost = zkHost; this.stateProvider = new ZkClientClusterStateProvider(zkHost);
this.clientIsInternal = httpClient == null; this.clientIsInternal = httpClient == null;
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
this.lbClient = createLBHttpSolrClient(myClient); this.lbClient = createLBHttpSolrClient(myClient);
@ -318,7 +355,7 @@ public class CloudSolrClient extends SolrClient {
*/ */
@Deprecated @Deprecated
public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient) { public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient) {
this.zkHost = buildZkHostString(zkHosts, chroot); this.stateProvider = new ZkClientClusterStateProvider(zkHosts, chroot);
this.clientIsInternal = httpClient == null; this.clientIsInternal = httpClient == null;
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
this.lbClient = createLBHttpSolrClient(myClient); this.lbClient = createLBHttpSolrClient(myClient);
@ -354,7 +391,7 @@ public class CloudSolrClient extends SolrClient {
*/ */
@Deprecated @Deprecated
public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) { public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) {
this(zkHosts, chroot, httpClient, lbSolrClient, null, updatesToLeaders, false); this(zkHosts, chroot, httpClient, lbSolrClient, null, updatesToLeaders, false, null);
} }
/** /**
@ -389,8 +426,15 @@ public class CloudSolrClient extends SolrClient {
LBHttpSolrClient lbSolrClient, LBHttpSolrClient lbSolrClient,
LBHttpSolrClient.Builder lbHttpSolrClientBuilder, LBHttpSolrClient.Builder lbHttpSolrClientBuilder,
boolean updatesToLeaders, boolean updatesToLeaders,
boolean directUpdatesToLeadersOnly) { boolean directUpdatesToLeadersOnly,
this.zkHost = buildZkHostString(zkHosts, chroot); ClusterStateProvider stateProvider
) {
if (stateProvider == null) {
this.stateProvider = new ZkClientClusterStateProvider(zkHosts, chroot);
} else {
this.stateProvider = stateProvider;
}
this.clientIsInternal = httpClient == null; this.clientIsInternal = httpClient == null;
this.shutdownLBHttpSolrServer = lbSolrClient == null; this.shutdownLBHttpSolrServer = lbSolrClient == null;
if(lbHttpSolrClientBuilder != null) lbSolrClient = lbHttpSolrClientBuilder.build(); if(lbHttpSolrClientBuilder != null) lbSolrClient = lbHttpSolrClientBuilder.build();
@ -428,7 +472,7 @@ public class CloudSolrClient extends SolrClient {
*/ */
@Deprecated @Deprecated
public CloudSolrClient(String zkHost, boolean updatesToLeaders, HttpClient httpClient) { public CloudSolrClient(String zkHost, boolean updatesToLeaders, HttpClient httpClient) {
this.zkHost = zkHost; this.stateProvider = new ZkClientClusterStateProvider(zkHost);
this.clientIsInternal = httpClient == null; this.clientIsInternal = httpClient == null;
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient; this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
this.lbClient = new LBHttpSolrClient.Builder() this.lbClient = new LBHttpSolrClient.Builder()
@ -447,7 +491,7 @@ public class CloudSolrClient extends SolrClient {
*/ */
public void setCollectionCacheTTl(int seconds){ public void setCollectionCacheTTl(int seconds){
assert seconds > 0; assert seconds > 0;
timeToLive = seconds*1000L; this.collectionStateCache.timeToLive = seconds * 1000L;
} }
/** /**
@ -475,8 +519,8 @@ public class CloudSolrClient extends SolrClient {
*/ */
@Deprecated @Deprecated
public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) { public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) {
this.zkHost = zkHost;
this.lbClient = lbClient; this.lbClient = lbClient;
this.stateProvider = new ZkClientClusterStateProvider(zkHost);
this.updatesToLeaders = updatesToLeaders; this.updatesToLeaders = updatesToLeaders;
this.directUpdatesToLeadersOnly = false; this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = false; shutdownLBHttpSolrServer = false;
@ -512,11 +556,15 @@ public class CloudSolrClient extends SolrClient {
* @return the zkHost value used to connect to zookeeper. * @return the zkHost value used to connect to zookeeper.
*/ */
public String getZkHost() { public String getZkHost() {
return zkHost; return assertZKStateProvider().zkHost;
} }
public ZkStateReader getZkStateReader() { public ZkStateReader getZkStateReader() {
return zkStateReader; if (stateProvider instanceof ZkClientClusterStateProvider) {
ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) stateProvider;
return provider.zkStateReader;
}
throw new IllegalStateException("This has no Zk stateReader");
} }
/** /**
@ -545,12 +593,12 @@ public class CloudSolrClient extends SolrClient {
/** Set the connect timeout to the zookeeper ensemble in ms */ /** Set the connect timeout to the zookeeper ensemble in ms */
public void setZkConnectTimeout(int zkConnectTimeout) { public void setZkConnectTimeout(int zkConnectTimeout) {
this.zkConnectTimeout = zkConnectTimeout; assertZKStateProvider().zkConnectTimeout = zkConnectTimeout;
} }
/** Set the timeout to the zookeeper ensemble in ms */ /** Set the timeout to the zookeeper ensemble in ms */
public void setZkClientTimeout(int zkClientTimeout) { public void setZkClientTimeout(int zkClientTimeout) {
this.zkClientTimeout = zkClientTimeout; assertZKStateProvider().zkClientTimeout = zkClientTimeout;
} }
/** /**
@ -559,29 +607,7 @@ public class CloudSolrClient extends SolrClient {
* *
*/ */
public void connect() { public void connect() {
if (zkStateReader == null) { stateProvider.connect();
synchronized (this) {
if (zkStateReader == null) {
ZkStateReader zk = null;
try {
zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk;
} catch (InterruptedException e) {
zk.close();
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (KeeperException e) {
zk.close();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (Exception e) {
if (zk != null) zk.close();
// do not wrap because clients may be relying on the underlying exception being thrown
throw e;
}
}
}
}
} }
/** /**
@ -592,12 +618,12 @@ public class CloudSolrClient extends SolrClient {
* @throws InterruptedException if the wait is interrupted * @throws InterruptedException if the wait is interrupted
*/ */
public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException { public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, zkHost); log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, stateProvider);
long timeout = System.nanoTime() + timeUnit.toNanos(duration); long timeout = System.nanoTime() + timeUnit.toNanos(duration);
while (System.nanoTime() < timeout) { while (System.nanoTime() < timeout) {
try { try {
connect(); connect();
log.info("Cluster at {} ready", zkHost); log.info("Cluster at {} ready", stateProvider);
return; return;
} }
catch (RuntimeException e) { catch (RuntimeException e) {
@ -624,8 +650,16 @@ public class CloudSolrClient extends SolrClient {
* @throws IOException if an IO error occurs * @throws IOException if an IO error occurs
*/ */
public void uploadConfig(Path configPath, String configName) throws IOException { public void uploadConfig(Path configPath, String configName) throws IOException {
connect(); stateProvider.connect();
zkStateReader.getConfigManager().uploadConfigDir(configPath, configName); assertZKStateProvider().uploadConfig(configPath, configName);
}
private ZkClientClusterStateProvider assertZKStateProvider() {
if (stateProvider instanceof ZkClientClusterStateProvider) {
return (ZkClientClusterStateProvider) stateProvider;
}
throw new IllegalArgumentException("This client does not use ZK");
} }
/** /**
@ -635,8 +669,7 @@ public class CloudSolrClient extends SolrClient {
* @throws IOException if an I/O exception occurs * @throws IOException if an I/O exception occurs
*/ */
public void downloadConfig(String configName, Path downloadPath) throws IOException { public void downloadConfig(String configName, Path downloadPath) throws IOException {
connect(); assertZKStateProvider().downloadConfig(configName, downloadPath);
zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
} }
/** /**
@ -654,8 +687,8 @@ public class CloudSolrClient extends SolrClient {
*/ */
public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate) public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException { throws InterruptedException, TimeoutException {
connect(); stateProvider.connect();
zkStateReader.waitForState(collection, wait, unit, predicate); assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
} }
/** /**
@ -669,11 +702,11 @@ public class CloudSolrClient extends SolrClient {
* @param watcher a watcher that will be called when the state changes * @param watcher a watcher that will be called when the state changes
*/ */
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
connect(); stateProvider.connect();
zkStateReader.registerCollectionStateWatcher(collection, watcher); assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
} }
private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException { private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
UpdateRequest updateRequest = (UpdateRequest) request; UpdateRequest updateRequest = (UpdateRequest) request;
ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
ModifiableSolrParams routableParams = new ModifiableSolrParams(); ModifiableSolrParams routableParams = new ModifiableSolrParams();
@ -693,15 +726,9 @@ public class CloudSolrClient extends SolrClient {
//Check to see if the collection is an alias. //Check to see if the collection is an alias.
Aliases aliases = zkStateReader.getAliases(); collection = stateProvider.getCollectionName(collection);
if(aliases != null) {
Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
if(collectionAliases != null && collectionAliases.containsKey(collection)) {
collection = collectionAliases.get(collection);
}
}
DocCollection col = getDocCollection(clusterState, collection,null); DocCollection col = getDocCollection(collection, null);
DocRouter router = col.getRouter(); DocRouter router = col.getRouter();
@ -1022,12 +1049,12 @@ public class CloudSolrClient extends SolrClient {
List<DocCollection> requestedCollections = null; List<DocCollection> requestedCollections = null;
boolean isAdmin = ADMIN_PATHS.contains(request.getPath()); boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
if (collection != null && !isAdmin) { // don't do _stateVer_ checking for admin requests if (collection != null && !isAdmin) { // don't do _stateVer_ checking for admin requests
Set<String> requestedCollectionNames = getCollectionNames(getZkStateReader().getClusterState(), collection); Set<String> requestedCollectionNames = getCollectionNames(collection);
StringBuilder stateVerParamBuilder = null; StringBuilder stateVerParamBuilder = null;
for (String requestedCollection : requestedCollectionNames) { for (String requestedCollection : requestedCollectionNames) {
// track the version of state we're using on the client side using the _stateVer_ param // track the version of state we're using on the client side using the _stateVer_ param
DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null); DocCollection coll = getDocCollection(requestedCollection, null);
int collVer = coll.getZNodeVersion(); int collVer = coll.getZNodeVersion();
if (coll.getStateFormat()>1) { if (coll.getStateFormat()>1) {
if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size()); if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
@ -1068,7 +1095,7 @@ public class CloudSolrClient extends SolrClient {
Map invalidStates = (Map) o; Map invalidStates = (Map) o;
for (Object invalidEntries : invalidStates.entrySet()) { for (Object invalidEntries : invalidStates.entrySet()) {
Map.Entry e = (Map.Entry) invalidEntries; Map.Entry e = (Map.Entry) invalidEntries;
getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue()); getDocCollection((String) e.getKey(), (Integer) e.getValue());
} }
} }
@ -1101,6 +1128,26 @@ public class CloudSolrClient extends SolrClient {
rootCause instanceof NoHttpResponseException || rootCause instanceof NoHttpResponseException ||
rootCause instanceof SocketException); rootCause instanceof SocketException);
if (wasCommError) {
// it was a communication error. it is likely that
// the node to which the request to be sent is down . So , expire the state
// so that the next attempt would fetch the fresh state
// just re-read state for all of them, if it has not been retired
// in retryExpiryTime time
for (DocCollection ext : requestedCollections) {
ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
if (cacheEntry == null) continue;
cacheEntry.maybeStale = true;
}
if (retryCount < MAX_STALE_RETRIES) {//if it is a communication error , we must try again
//may be, we have a stale version of the collection state
// and we could not get any information from the server
//it is probably not worth trying again and again because
// the state would not have been updated
return requestWithRetryOnStaleState(request, retryCount + 1, collection);
}
}
boolean stateWasStale = false; boolean stateWasStale = false;
if (retryCount < MAX_STALE_RETRIES && if (retryCount < MAX_STALE_RETRIES &&
requestedCollections != null && requestedCollections != null &&
@ -1125,7 +1172,7 @@ public class CloudSolrClient extends SolrClient {
!requestedCollections.isEmpty() && !requestedCollections.isEmpty() &&
wasCommError) { wasCommError) {
for (DocCollection ext : requestedCollections) { for (DocCollection ext : requestedCollections) {
DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null); DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) { if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
// looks like we couldn't reach the server because the state was stale == retry // looks like we couldn't reach the server because the state was stale == retry
stateWasStale = true; stateWasStale = true;
@ -1163,14 +1210,12 @@ public class CloudSolrClient extends SolrClient {
throws SolrServerException, IOException { throws SolrServerException, IOException {
connect(); connect();
ClusterState clusterState = zkStateReader.getClusterState();
boolean sendToLeaders = false; boolean sendToLeaders = false;
List<String> replicas = null; List<String> replicas = null;
if (request instanceof IsUpdateRequest) { if (request instanceof IsUpdateRequest) {
if (request instanceof UpdateRequest) { if (request instanceof UpdateRequest) {
NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection, clusterState); NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
if (response != null) { if (response != null) {
return response; return response;
} }
@ -1185,9 +1230,10 @@ public class CloudSolrClient extends SolrClient {
} }
List<String> theUrlList = new ArrayList<>(); List<String> theUrlList = new ArrayList<>();
if (ADMIN_PATHS.contains(request.getPath())) { if (ADMIN_PATHS.contains(request.getPath())) {
Set<String> liveNodes = clusterState.getLiveNodes(); Set<String> liveNodes = getLiveNodes();
for (String liveNode : liveNodes) { for (String liveNode : liveNodes) {
theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode)); theUrlList.add(ZkStateReader.getBaseUrlForNodeName(liveNode,
(String) stateProvider.getClusterProperties().getOrDefault(ZkStateReader.URL_SCHEME,"http")));
} }
} else { } else {
@ -1196,7 +1242,7 @@ public class CloudSolrClient extends SolrClient {
"No collection param specified on request and no default collection has been set."); "No collection param specified on request and no default collection has been set.");
} }
Set<String> collectionNames = getCollectionNames(clusterState, collection); Set<String> collectionNames = getCollectionNames(collection);
if (collectionNames.size() == 0) { if (collectionNames.size() == 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not find collection: " + collection); "Could not find collection: " + collection);
@ -1213,11 +1259,11 @@ public class CloudSolrClient extends SolrClient {
// add it to the Map of slices. // add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>(); Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionNames) { for (String collectionName : collectionNames) {
DocCollection col = getDocCollection(clusterState, collectionName, null); DocCollection col = getDocCollection(collectionName, null);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col); Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true); ClientUtils.addSlices(slices, collectionName, routeSlices, true);
} }
Set<String> liveNodes = clusterState.getLiveNodes(); Set<String> liveNodes = getLiveNodes();
List<String> leaderUrlList = null; List<String> leaderUrlList = null;
List<String> urlList = null; List<String> urlList = null;
@ -1293,16 +1339,18 @@ public class CloudSolrClient extends SolrClient {
return rsp.getResponse(); return rsp.getResponse();
} }
private Set<String> getCollectionNames(ClusterState clusterState, private Set<String> getLiveNodes() {
String collection) { return getZkStateReader().getClusterState().getLiveNodes();
}
Set<String> getCollectionNames(String collection) {
// Extract each comma separated collection name and store in a List. // Extract each comma separated collection name and store in a List.
List<String> rawCollectionsList = StrUtils.splitSmart(collection, ",", true); List<String> rawCollectionsList = StrUtils.splitSmart(collection, ",", true);
Set<String> collectionNames = new HashSet<>(); Set<String> collectionNames = new HashSet<>();
// validate collections // validate collections
for (String collectionName : rawCollectionsList) { for (String collectionName : rawCollectionsList) {
if (!clusterState.hasCollection(collectionName)) { if (stateProvider.getState(collectionName) == null) {
Aliases aliases = zkStateReader.getAliases(); String alias = stateProvider.getAlias(collection);
String alias = aliases.getCollectionAlias(collectionName);
if (alias != null) { if (alias != null) {
List<String> aliasList = StrUtils.splitSmart(alias, ",", true); List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
collectionNames.addAll(aliasList); collectionNames.addAll(aliasList);
@ -1319,13 +1367,7 @@ public class CloudSolrClient extends SolrClient {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (zkStateReader != null) { stateProvider.close();
synchronized(this) {
if (zkStateReader!= null)
zkStateReader.close();
zkStateReader = null;
}
}
if (shutdownLBHttpSolrServer) { if (shutdownLBHttpSolrServer) {
lbClient.close(); lbClient.close();
@ -1371,15 +1413,17 @@ public class CloudSolrClient extends SolrClient {
} }
protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException { protected DocCollection getDocCollection(String collection, Integer expectedVersion) throws SolrException {
if (expectedVersion == null) expectedVersion = -1;
if (collection == null) return null; if (collection == null) return null;
DocCollection col = getFromCache(collection); ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
if (col != null) { if (col != null) {
if (expectedVersion == null) return col; if (expectedVersion <= col.getZNodeVersion()
if (expectedVersion.intValue() == col.getZNodeVersion()) return col; && !cacheEntry.shoulRetry()) return col;
} }
ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection); ClusterState.CollectionRef ref = getCollectionRef(collection);
if (ref == null) { if (ref == null) {
//no such collection exists //no such collection exists
return null; return null;
@ -1390,30 +1434,34 @@ public class CloudSolrClient extends SolrClient {
} }
List locks = this.locks; List locks = this.locks;
final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size())); final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
DocCollection fetchedCol = null;
synchronized (lock) { synchronized (lock) {
//we have waited for sometime just check once again /*we have waited for sometime just check once again*/
col = getFromCache(collection); cacheEntry = collectionStateCache.get(collection);
col = cacheEntry == null ? null : cacheEntry.cached;
if (col != null) { if (col != null) {
if (expectedVersion == null) return col; if (expectedVersion <= col.getZNodeVersion()
if (expectedVersion.intValue() == col.getZNodeVersion()) { && !cacheEntry.shoulRetry()) return col;
return col;
} else {
collectionStateCache.remove(collection);
}
} }
col = ref.get();//this is a call to ZK // We are going to fetch a new version
// we MUST try to get a new version
fetchedCol = ref.get();//this is a call to ZK
if (fetchedCol == null) return null;// this collection no more exists
if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
cacheEntry.setRetriedAt();//we retried and found that it is the same version
cacheEntry.maybeStale = false;
} else {
if (fetchedCol.getStateFormat() > 1)
collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
}
return fetchedCol;
} }
if (col == null) return null;
if (col.getStateFormat() > 1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
return col;
} }
private DocCollection getFromCache(String c){ ClusterState.CollectionRef getCollectionRef(String collection) {
ExpiringCachedDocCollection cachedState = collectionStateCache.get(c); return stateProvider.getState(collection);
return cachedState != null ? cachedState.cached : null;
} }
/** /**
* Useful for determining the minimum achieved replication factor across * Useful for determining the minimum achieved replication factor across
* all shards involved in processing an update request, typically useful * all shards involved in processing an update request, typically useful
@ -1449,9 +1497,9 @@ public class CloudSolrClient extends SolrClient {
Map<String,Integer> results = new HashMap<String,Integer>(); Map<String,Integer> results = new HashMap<String,Integer>();
if (resp instanceof CloudSolrClient.RouteResponse) { if (resp instanceof CloudSolrClient.RouteResponse) {
NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses(); NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
ClusterState clusterState = zkStateReader.getClusterState(); DocCollection coll = getDocCollection(collection, null);
Map<String,String> leaders = new HashMap<String,String>(); Map<String,String> leaders = new HashMap<String,String>();
for (Slice slice : clusterState.getActiveSlices(collection)) { for (Slice slice : coll.getActiveSlices()) {
Replica leader = slice.getLeader(); Replica leader = slice.getLeader();
if (leader != null) { if (leader != null) {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader); ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
@ -1560,6 +1608,8 @@ public class CloudSolrClient extends SolrClient {
private LBHttpSolrClient.Builder lbClientBuilder; private LBHttpSolrClient.Builder lbClientBuilder;
private boolean shardLeadersOnly; private boolean shardLeadersOnly;
private boolean directUpdatesToLeadersOnly; private boolean directUpdatesToLeadersOnly;
private ClusterStateProvider stateProvider;
public Builder() { public Builder() {
this.zkHosts = new ArrayList(); this.zkHosts = new ArrayList();
@ -1662,12 +1712,35 @@ public class CloudSolrClient extends SolrClient {
return this; return this;
} }
public Builder withClusterStateProvider(ClusterStateProvider stateProvider) {
this.stateProvider = stateProvider;
return this;
}
/** /**
* Create a {@link CloudSolrClient} based on the provided configuration. * Create a {@link CloudSolrClient} based on the provided configuration.
*/ */
public CloudSolrClient build() { public CloudSolrClient build() {
if (stateProvider == null) {
stateProvider = new ZkClientClusterStateProvider(zkHosts, zkChroot);
}
return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, lbClientBuilder, return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, lbClientBuilder,
shardLeadersOnly, directUpdatesToLeadersOnly); shardLeadersOnly, directUpdatesToLeadersOnly, stateProvider);
} }
} }
interface ClusterStateProvider extends Closeable {
ClusterState.CollectionRef getState(String collection);
Set<String> liveNodes();
String getAlias(String collection);
String getCollectionName(String name);
Map<String, Object> getClusterProperties();
void connect();
}
} }

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ZkClientClusterStateProvider implements CloudSolrClient.ClusterStateProvider {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ZkStateReader zkStateReader;
String zkHost;
int zkConnectTimeout = 10000;
int zkClientTimeout = 10000;
public ZkClientClusterStateProvider(Collection<String> zkHosts, String chroot) {
zkHost = buildZkHostString(zkHosts,chroot);
}
public ZkClientClusterStateProvider(String zkHost){
this.zkHost = zkHost;
}
@Override
public ClusterState.CollectionRef getState(String collection) {
return zkStateReader.getClusterState().getCollectionRef(collection);
}
@Override
public Set<String> liveNodes() {
return zkStateReader.getClusterState().getLiveNodes();
}
@Override
public String getAlias(String collection) {
Aliases aliases = zkStateReader.getAliases();
return aliases.getCollectionAlias(collection);
}
@Override
public Map<String, Object> getClusterProperties() {
return zkStateReader.getClusterProperties();
}
@Override
public String getCollectionName(String name) {
Aliases aliases = zkStateReader.getAliases();
if (aliases != null) {
Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
if (collectionAliases != null && collectionAliases.containsKey(name)) {
name = collectionAliases.get(name);
}
}
return name;
}
/**
* Download a named config from Zookeeper to a location on the filesystem
* @param configName the name of the config
* @param downloadPath the path to write config files to
* @throws IOException if an I/O exception occurs
*/
public void downloadConfig(String configName, Path downloadPath) throws IOException {
connect();
zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
}
public void uploadConfig(Path configPath, String configName) throws IOException {
connect();
zkStateReader.getConfigManager().uploadConfigDir(configPath, configName);
}
@Override
public void connect() {
if (zkStateReader == null) {
synchronized (this) {
if (zkStateReader == null) {
ZkStateReader zk = null;
try {
zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk;
log.info("Cluster at {} ready", zkHost);
} catch (InterruptedException e) {
zk.close();
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (KeeperException e) {
zk.close();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (Exception e) {
if (zk != null) zk.close();
// do not wrap because clients may be relying on the underlying exception being thrown
throw e;
}
}
}
}
}
@Override
public void close() throws IOException {
if (zkStateReader != null) {
synchronized (this) {
if (zkStateReader != null)
zkStateReader.close();
zkStateReader = null;
}
}
}
static String buildZkHostString(Collection<String> zkHosts, String chroot) {
if (zkHosts == null || zkHosts.isEmpty()) {
throw new IllegalArgumentException("Cannot create CloudSearchClient without valid ZooKeeper host; none specified!");
}
StringBuilder zkBuilder = new StringBuilder();
int lastIndexValue = zkHosts.size() - 1;
int i = 0;
for (String zkHost : zkHosts) {
zkBuilder.append(zkHost);
if (i < lastIndexValue) {
zkBuilder.append(",");
}
i++;
}
if (chroot != null) {
if (chroot.startsWith("/")) {
zkBuilder.append(chroot);
} else {
throw new IllegalArgumentException(
"The chroot must start with a forward slash.");
}
}
/* Log the constructed connection string and then initialize. */
final String zkHostString = zkBuilder.toString();
log.debug("Final constructed zkHost string: " + zkHostString);
return zkHostString;
}
@Override
public String toString() {
return zkHost;
}
}

View File

@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
@ -442,13 +443,19 @@ public class ClusterState implements JSONWriter.Writable {
} }
public static class CollectionRef { public static class CollectionRef {
protected final AtomicInteger gets = new AtomicInteger();
private final DocCollection coll; private final DocCollection coll;
public int getCount(){
return gets.get();
}
public CollectionRef(DocCollection coll) { public CollectionRef(DocCollection coll) {
this.coll = coll; this.coll = coll;
} }
public DocCollection get(){ public DocCollection get(){
gets.incrementAndGet();
return coll; return coll;
} }

View File

@ -633,6 +633,7 @@ public class ZkStateReader implements Closeable {
@Override @Override
public DocCollection get() { public DocCollection get() {
gets.incrementAndGet();
// TODO: consider limited caching // TODO: consider limited caching
return getCollectionLive(ZkStateReader.this, collName); return getCollectionLive(ZkStateReader.this, collName);
} }
@ -923,6 +924,10 @@ public class ZkStateReader implements Closeable {
* @lucene.experimental * @lucene.experimental
*/ */
public String getBaseUrlForNodeName(final String nodeName) { public String getBaseUrlForNodeName(final String nodeName) {
return getBaseUrlForNodeName(nodeName, getClusterProperty(URL_SCHEME, "http"));
}
public static String getBaseUrlForNodeName(final String nodeName, String urlScheme) {
final int _offset = nodeName.indexOf("_"); final int _offset = nodeName.indexOf("_");
if (_offset < 0) { if (_offset < 0) {
throw new IllegalArgumentException("nodeName does not contain expected '_' seperator: " + nodeName); throw new IllegalArgumentException("nodeName does not contain expected '_' seperator: " + nodeName);
@ -930,7 +935,6 @@ public class ZkStateReader implements Closeable {
final String hostAndPort = nodeName.substring(0,_offset); final String hostAndPort = nodeName.substring(0,_offset);
try { try {
final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8"); final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8");
String urlScheme = getClusterProperty(URL_SCHEME, "http");
return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path)); return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path));
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
throw new IllegalStateException("JVM Does not seem to support UTF-8", e); throw new IllegalStateException("JVM Does not seem to support UTF-8", e);

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import com.google.common.collect.ImmutableSet;
import org.apache.http.NoHttpResponseException;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.easymock.EasyMock;
import static java.nio.charset.StandardCharsets.UTF_8;
public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
public void testCaching() throws Exception {
String collName = "gettingstarted";
Set<String> livenodes = new HashSet<>();
Map<String, ClusterState.CollectionRef> refs = new HashMap<>();
Map<String, DocCollection> colls = new HashMap<>();
class Ref extends ClusterState.CollectionRef {
private String c;
public Ref(String c) {
super(null);
this.c = c;
}
@Override
public boolean isLazilyLoaded() {
return true;
}
@Override
public DocCollection get() {
gets.incrementAndGet();
return colls.get(c);
}
}
Map<String, Function> responses = new HashMap<>();
NamedList okResponse = new NamedList();
okResponse.add("responseHeader", new NamedList<>(Collections.singletonMap("status", 0)));
LBHttpSolrClient mockLbclient = getMockLbHttpSolrClient(responses);
AtomicInteger lbhttpRequestCount = new AtomicInteger();
try (CloudSolrClient cloudClient = new CloudSolrClient.Builder()
.withLBHttpSolrClient(mockLbclient)
.withClusterStateProvider(getStateProvider(livenodes, refs))
.build()) {
livenodes.addAll(ImmutableSet.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr"));
ClusterState cs = ClusterState.load(1, coll1State.getBytes(UTF_8),
Collections.emptySet(), "/collections/gettingstarted/state.json");
refs.put(collName, new Ref(collName));
colls.put(collName, cs.getCollectionOrNull(collName));
responses.put("request", o -> {
int i = lbhttpRequestCount.incrementAndGet();
if (i == 1) return new ConnectException("TEST");
if (i == 2) return new SocketException("TEST");
if (i == 3) return new NoHttpResponseException("TEST");
return okResponse;
});
UpdateRequest update = new UpdateRequest()
.add("id", "123", "desc", "Something 0");
cloudClient.request(update, collName);
assertEquals(2, refs.get(collName).getCount());
}
}
private LBHttpSolrClient getMockLbHttpSolrClient(Map<String, Function> responses) throws Exception {
LBHttpSolrClient mockLbclient = EasyMock.createMock(LBHttpSolrClient.class);
EasyMock.reset(mockLbclient);
mockLbclient.request(EasyMock.anyObject(LBHttpSolrClient.Req.class));
EasyMock.expectLastCall().andAnswer(() -> {
LBHttpSolrClient.Req req = (LBHttpSolrClient.Req) EasyMock.getCurrentArguments()[0];
Function f = responses.get("request");
if (f == null) return null;
Object res = f.apply(null);
if (res instanceof Exception) throw (Throwable) res;
LBHttpSolrClient.Rsp rsp = new LBHttpSolrClient.Rsp();
rsp.rsp = (NamedList<Object>) res;
rsp.server = req.servers.get(0);
return rsp;
}).anyTimes();
mockLbclient.getHttpClient();
EasyMock.expectLastCall().andAnswer(() -> null).anyTimes();
EasyMock.replay(mockLbclient);
return mockLbclient;
}
private CloudSolrClient.ClusterStateProvider getStateProvider(Set<String> livenodes,
Map<String, ClusterState.CollectionRef> colls) {
return new CloudSolrClient.ClusterStateProvider() {
@Override
public ClusterState.CollectionRef getState(String collection) {
return colls.get(collection);
}
@Override
public Set<String> liveNodes() {
return livenodes;
}
@Override
public Map<String, Object> getClusterProperties() {
return Collections.EMPTY_MAP;
}
@Override
public String getAlias(String collection) {
return collection;
}
@Override
public String getCollectionName(String name) {
return name;
}
@Override
public void connect() { }
@Override
public void close() throws IOException {
}
};
}
private String coll1State = "{'gettingstarted':{\n" +
" 'replicationFactor':'2',\n" +
" 'router':{'name':'compositeId'},\n" +
" 'maxShardsPerNode':'2',\n" +
" 'autoAddReplicas':'false',\n" +
" 'shards':{\n" +
" 'shard1':{\n" +
" 'range':'80000000-ffffffff',\n" +
" 'state':'active',\n" +
" 'replicas':{\n" +
" 'core_node2':{\n" +
" 'core':'gettingstarted_shard1_replica1',\n" +
" 'base_url':'http://192.168.1.108:8983/solr',\n" +
" 'node_name':'192.168.1.108:8983_solr',\n" +
" 'state':'active',\n" +
" 'leader':'true'},\n" +
" 'core_node4':{\n" +
" 'core':'gettingstarted_shard1_replica2',\n" +
" 'base_url':'http://192.168.1.108:7574/solr',\n" +
" 'node_name':'192.168.1.108:7574_solr',\n" +
" 'state':'active'}}},\n" +
" 'shard2':{\n" +
" 'range':'0-7fffffff',\n" +
" 'state':'active',\n" +
" 'replicas':{\n" +
" 'core_node1':{\n" +
" 'core':'gettingstarted_shard2_replica1',\n" +
" 'base_url':'http://192.168.1.108:8983/solr',\n" +
" 'node_name':'192.168.1.108:8983_solr',\n" +
" 'state':'active',\n" +
" 'leader':'true'},\n" +
" 'core_node3':{\n" +
" 'core':'gettingstarted_shard2_replica2',\n" +
" 'base_url':'http://192.168.1.108:7574/solr',\n" +
" 'node_name':'192.168.1.108:7574_solr',\n" +
" 'state':'active'}}}}}}";
}