mirror of https://github.com/apache/lucene.git
SOLR-844 -- A SolrServer implementation to front-end multiple solr servers and provides load balancing and failover support
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@756381 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b53c3495e4
commit
319f33973d
|
@ -186,6 +186,9 @@ New Features
|
|||
|
||||
31. SOLR-1038: Enhance CommonsHttpSolrServer to add docs in batch using an iterator API (Noble Paul via shalin)
|
||||
|
||||
32. SOLR-844: A SolrServer implementation to front-end multiple solr servers and provides load balancing and failover
|
||||
support (Noble Paul, Mark Miller, hossman via shalin)
|
||||
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -0,0 +1,320 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
|
||||
import org.apache.solr.client.solrj.*;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.SolrException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* LBHttpSolrServer or "LoadBalanced HttpSolrServer" is just a wrapper to CommonsHttpSolrServer. This is useful when you
|
||||
* have multiple SolrServers and the requests need to be Load Balanced among them. This should <b>NOT</b> be used for
|
||||
* indexing. Also see the <a href="http://wiki.apache.org/solr/LBHttpSolrServer">wiki</a> page.
|
||||
* <p/>
|
||||
* It offers automatic failover when a server goes down and it detects when the server comes back up.
|
||||
* <p/>
|
||||
* Load balancing is done using a simple roundrobin on the list of servers.
|
||||
* <p/>
|
||||
* If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
|
||||
* off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
|
||||
* This process is continued till it tries all the live servers. If atleast one server is alive, the request succeeds,
|
||||
* andif not it fails.
|
||||
* <blockquote><pre>
|
||||
* SolrServer lbHttpSolrServer = new LBHttpSolrServer("http://host1:8080/solr/","http://host2:8080/solr","http://host2:8080/solr");
|
||||
* //or if you wish to pass the HttpClient do as follows
|
||||
* httpClient httpClient = new HttpClient();
|
||||
* SolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient,"http://host1:8080/solr/","http://host2:8080/solr","http://host2:8080/solr");
|
||||
* </pre></blockquote>
|
||||
* This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread.
|
||||
* This interval can be set using {@see #setAliveCheckInterval} , the default is set to one minute.
|
||||
* <p/>
|
||||
* <b>When to use this?</b><br/> This can be used as a software load balancer when you do not wish to setup an external
|
||||
* load balancer. The code is relatively new and the API is currently experimental. Alternatives to this code are to use
|
||||
* a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
|
||||
* href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
|
||||
*
|
||||
* @version $Id$
|
||||
* @since solr 1.4
|
||||
*/
|
||||
public class LBHttpSolrServer extends SolrServer {
|
||||
private final CopyOnWriteArrayList<ServerWrapper> aliveServers = new CopyOnWriteArrayList<ServerWrapper>();
|
||||
private final CopyOnWriteArrayList<ServerWrapper> zombieServers = new CopyOnWriteArrayList<ServerWrapper>();
|
||||
private ScheduledExecutorService aliveCheckExecutor;
|
||||
|
||||
private HttpClient httpClient;
|
||||
private final AtomicInteger counter = new AtomicInteger(-1);
|
||||
|
||||
private ReentrantLock checkLock = new ReentrantLock();
|
||||
private static final SolrQuery solrQuery = new SolrQuery("*:*");
|
||||
|
||||
static {
|
||||
solrQuery.setRows(0);
|
||||
}
|
||||
|
||||
private static class ServerWrapper {
|
||||
final CommonsHttpSolrServer solrServer;
|
||||
|
||||
// Used only by the thread in aliveCheckExecutor
|
||||
long lastUsed, lastChecked;
|
||||
|
||||
int failedPings = 0;
|
||||
|
||||
public ServerWrapper(CommonsHttpSolrServer solrServer) {
|
||||
this.solrServer = solrServer;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return solrServer.getBaseURL();
|
||||
}
|
||||
}
|
||||
|
||||
public LBHttpSolrServer(String... solrServerUrls) throws MalformedURLException {
|
||||
this(new HttpClient(new MultiThreadedHttpConnectionManager()), solrServerUrls);
|
||||
}
|
||||
|
||||
public LBHttpSolrServer(HttpClient httpClient, String... solrServerUrl)
|
||||
throws MalformedURLException {
|
||||
this(httpClient, new BinaryResponseParser(), solrServerUrl);
|
||||
}
|
||||
|
||||
public LBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl)
|
||||
throws MalformedURLException {
|
||||
this.httpClient = httpClient;
|
||||
for (String s : solrServerUrl) {
|
||||
aliveServers.add(new ServerWrapper(new CommonsHttpSolrServer(s, httpClient, parser)));
|
||||
}
|
||||
}
|
||||
|
||||
public void addSolrServer(String server) throws MalformedURLException {
|
||||
CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(server, httpClient);
|
||||
checkLock.lock();
|
||||
try {
|
||||
aliveServers.add(new ServerWrapper(solrServer));
|
||||
} finally {
|
||||
checkLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public String removeSolrServer(String server) {
|
||||
try {
|
||||
server = new URL(server).toExternalForm();
|
||||
} catch (MalformedURLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (server.endsWith("/")) {
|
||||
server = server.substring(0, server.length() - 1);
|
||||
}
|
||||
this.checkLock.lock();
|
||||
try {
|
||||
for (ServerWrapper serverWrapper : aliveServers) {
|
||||
if (serverWrapper.solrServer.getBaseURL().equals(server)) {
|
||||
aliveServers.remove(serverWrapper);
|
||||
return serverWrapper.solrServer.getBaseURL();
|
||||
}
|
||||
}
|
||||
if (zombieServers.isEmpty()) return null;
|
||||
|
||||
for (ServerWrapper serverWrapper : zombieServers) {
|
||||
if (serverWrapper.solrServer.getBaseURL().equals(server)) {
|
||||
zombieServers.remove(serverWrapper);
|
||||
return serverWrapper.solrServer.getBaseURL();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
checkLock.unlock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setConnectionTimeout(int timeout) {
|
||||
httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* set connectionManagerTimeout on the HttpClient.*
|
||||
*/
|
||||
public void setConnectionManagerTimeout(int timeout) {
|
||||
httpClient.getParams().setConnectionManagerTimeout(timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably
|
||||
* not for indexing.
|
||||
*/
|
||||
public void setSoTimeout(int timeout) {
|
||||
httpClient.getParams().setSoTimeout(timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to query a live server. If no live servers are found it throws a SolrServerException. If the request failed
|
||||
* due to IOException then the live server is moved to dead pool and the request is retried on another live server if
|
||||
* available. If all live servers are exhausted then a SolrServerException is thrown.
|
||||
*
|
||||
* @param request the SolrRequest.
|
||||
*
|
||||
* @return response
|
||||
*
|
||||
* @throws SolrServerException
|
||||
* @throws IOException
|
||||
*/
|
||||
public NamedList<Object> request(final SolrRequest request)
|
||||
throws SolrServerException, IOException {
|
||||
int count = counter.incrementAndGet();
|
||||
int attempts = 0;
|
||||
Exception ex;
|
||||
while (true) {
|
||||
int size = aliveServers.size();
|
||||
if (size < 1) throw new SolrServerException("No live SolrServers available to handle this request");
|
||||
ServerWrapper solrServer;
|
||||
try {
|
||||
solrServer = aliveServers.get(count % size);
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
//this list changes dynamically. so it is expected to get IndexOutOfBoundsException
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
return solrServer.solrServer.request(request);
|
||||
} catch (SolrException e) {
|
||||
// Server is alive but the request was malformed or invalid
|
||||
throw e;
|
||||
} catch (SolrServerException e) {
|
||||
if (e.getRootCause() instanceof IOException) {
|
||||
ex = e;
|
||||
moveAliveToDead(solrServer);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SolrServerException(e);
|
||||
}
|
||||
attempts++;
|
||||
if (attempts >= aliveServers.size())
|
||||
throw new SolrServerException("No live SolrServers available to handle this request", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
|
||||
* aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
|
||||
*
|
||||
* @param zombieServer a server in the dead pool
|
||||
*/
|
||||
private void checkAZombieServer(ServerWrapper zombieServer) {
|
||||
long currTime = System.currentTimeMillis();
|
||||
checkLock.lock();
|
||||
try {
|
||||
zombieServer.lastChecked = currTime;
|
||||
QueryResponse resp = zombieServer.solrServer.query(solrQuery);
|
||||
if (resp.getStatus() == 0) {
|
||||
//server has come back up
|
||||
zombieServer.lastUsed = currTime;
|
||||
zombieServers.remove(zombieServer);
|
||||
aliveServers.add(zombieServer);
|
||||
zombieServer.failedPings = 0;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
zombieServer.failedPings++;
|
||||
//Expected . The server is still down
|
||||
} finally {
|
||||
checkLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void moveAliveToDead(ServerWrapper solrServer) {
|
||||
checkLock.lock();
|
||||
try {
|
||||
boolean result = aliveServers.remove(solrServer);
|
||||
if (result) {
|
||||
if (zombieServers.addIfAbsent(solrServer)) {
|
||||
startAliveCheckExecutor();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
checkLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private int interval = CHECK_INTERVAL;
|
||||
|
||||
/**
|
||||
* LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that
|
||||
* interval
|
||||
*
|
||||
* @param interval time in seconds
|
||||
*/
|
||||
public void setAliveCheckInterval(int interval) {
|
||||
if (interval <= 0) {
|
||||
throw new IllegalArgumentException("Alive check interval must be " +
|
||||
"positive, specified value = " + interval);
|
||||
}
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
private void startAliveCheckExecutor() {
|
||||
if (aliveCheckExecutor == null) {
|
||||
synchronized (this) {
|
||||
if (aliveCheckExecutor == null) {
|
||||
aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
aliveCheckExecutor.scheduleAtFixedRate(
|
||||
getAliveCheckRunner(new WeakReference<LBHttpSolrServer>(this)),
|
||||
this.interval, this.interval, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrServer> lbHttpSolrServer) {
|
||||
return new Runnable() {
|
||||
public void run() {
|
||||
LBHttpSolrServer solrServer = lbHttpSolrServer.get();
|
||||
if (solrServer != null && solrServer.zombieServers != null) {
|
||||
for (ServerWrapper zombieServer : solrServer.zombieServers) {
|
||||
solrServer.checkAZombieServer(zombieServer);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public HttpClient getHttpClient() {
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
this.aliveCheckExecutor.shutdownNow();
|
||||
} finally {
|
||||
super.finalize();
|
||||
}
|
||||
}
|
||||
|
||||
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
|
||||
}
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
||||
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.util.AbstractSolrTestCase;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Test for LBHttpSolrServer
|
||||
*
|
||||
* @version $Id$
|
||||
* @since solr 1.4
|
||||
*/
|
||||
public class TestLBHttpSolrServer extends TestCase {
|
||||
SolrInstance[] solr = new SolrInstance[3];
|
||||
HttpClient httpClient = new HttpClient();
|
||||
|
||||
public void setUp() throws Exception {
|
||||
for (int i = 0; i < solr.length; i++) {
|
||||
solr[i] = new SolrInstance("solr" + i, 43000 + i);
|
||||
solr[i].setUp();
|
||||
solr[i].startJetty();
|
||||
addDocs(solr[i]);
|
||||
}
|
||||
}
|
||||
|
||||
private void addDocs(SolrInstance solrInstance) throws IOException, SolrServerException {
|
||||
List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", i);
|
||||
doc.addField("name", solrInstance.name);
|
||||
docs.add(doc);
|
||||
}
|
||||
CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(solrInstance.getUrl(), httpClient);
|
||||
UpdateResponse resp = solrServer.add(docs);
|
||||
assertEquals(0, resp.getStatus());
|
||||
resp = solrServer.commit();
|
||||
assertEquals(0, resp.getStatus());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
for (SolrInstance aSolr : solr) {
|
||||
aSolr.tearDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testSimple() throws Exception {
|
||||
String[] s = new String[solr.length];
|
||||
for (int i = 0; i < solr.length; i++) {
|
||||
s[i] = solr[i].getUrl();
|
||||
}
|
||||
LBHttpSolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient, s);
|
||||
lbHttpSolrServer.setAliveCheckInterval(1);
|
||||
SolrQuery solrQuery = new SolrQuery("*:*");
|
||||
Set<String> names = new HashSet<String>();
|
||||
QueryResponse resp = null;
|
||||
for (String value : s) {
|
||||
resp = lbHttpSolrServer.query(solrQuery);
|
||||
assertEquals(10, resp.getResults().getNumFound());
|
||||
names.add(resp.getResults().get(0).getFieldValue("name").toString());
|
||||
}
|
||||
assertEquals(3, names.size());
|
||||
|
||||
// Kill a server and test again
|
||||
solr[1].jetty.stop();
|
||||
solr[1].jetty = null;
|
||||
names.clear();
|
||||
for (String value : s) {
|
||||
resp = lbHttpSolrServer.query(solrQuery);
|
||||
assertEquals(10, resp.getResults().getNumFound());
|
||||
names.add(resp.getResults().get(0).getFieldValue("name").toString());
|
||||
}
|
||||
assertEquals(2, names.size());
|
||||
assertFalse(names.contains("solr1"));
|
||||
|
||||
// Start the killed server once again
|
||||
solr[1].startJetty();
|
||||
// Wait for the alive check to complete
|
||||
Thread.sleep(1200);
|
||||
names.clear();
|
||||
for (String value : s) {
|
||||
resp = lbHttpSolrServer.query(solrQuery);
|
||||
assertEquals(10, resp.getResults().getNumFound());
|
||||
names.add(resp.getResults().get(0).getFieldValue("name").toString());
|
||||
}
|
||||
assertEquals(3, names.size());
|
||||
}
|
||||
|
||||
|
||||
private class SolrInstance extends AbstractSolrTestCase {
|
||||
|
||||
String name;
|
||||
File homeDir;
|
||||
File confDir;
|
||||
int port;
|
||||
JettySolrRunner jetty;
|
||||
|
||||
public SolrInstance(String name, int port) {
|
||||
this.name = name;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public String getHomeDir() {
|
||||
return homeDir.toString();
|
||||
}
|
||||
|
||||
public String getUrl() {
|
||||
return "http://localhost:" + port + "/solr";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSchemaFile() {
|
||||
return "." + File.separator + "solr" + File.separator + "conf" + File.separator + "schema-replication1.xml";
|
||||
}
|
||||
|
||||
public String getConfDir() {
|
||||
return confDir.toString();
|
||||
}
|
||||
|
||||
public String getDataDir() {
|
||||
return dataDir.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSolrConfigFile() {
|
||||
String fname = "";
|
||||
fname = "." + File.separator + "solr" + File.separator + "conf" + File.separator + "solrconfig-slave1.xml";
|
||||
return fname;
|
||||
}
|
||||
|
||||
public void setUp() throws Exception {
|
||||
String home = System.getProperty("java.io.tmpdir")
|
||||
+ File.separator
|
||||
+ getClass().getName() + "-" + System.currentTimeMillis();
|
||||
|
||||
|
||||
homeDir = new File(home, name);
|
||||
dataDir = new File(homeDir, "data");
|
||||
confDir = new File(homeDir, "conf");
|
||||
|
||||
homeDir.mkdirs();
|
||||
dataDir.mkdirs();
|
||||
confDir.mkdirs();
|
||||
|
||||
File f = new File(confDir, "solrconfig.xml");
|
||||
FileUtils.copyFile(new File(getSolrConfigFile()), f);
|
||||
f = new File(confDir, "schema.xml");
|
||||
FileUtils.copyFile(new File(getSchemaFile()), f);
|
||||
|
||||
}
|
||||
|
||||
public void tearDown() throws Exception {
|
||||
try {
|
||||
jetty.stop();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
super.tearDown();
|
||||
AbstractSolrTestCase.recurseDelete(homeDir);
|
||||
}
|
||||
|
||||
public void startJetty() throws Exception {
|
||||
jetty = new JettySolrRunner("/solr", port);
|
||||
System.setProperty("solr.solr.home", getHomeDir());
|
||||
System.setProperty("solr.data.dir", getDataDir());
|
||||
jetty.start();
|
||||
// System.out.println("waiting.........");
|
||||
// Thread.sleep(5000);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue