SOLR-4210: Requests to a Collection that does not exist on the receiving node should be proxied to a suitable node.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1450015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-02-26 04:29:56 +00:00
parent d0d2ff9209
commit 56e88400ae
3 changed files with 192 additions and 4 deletions

View File

@ -91,6 +91,9 @@ New Features
* SOLR-4078: Allow custom naming of SolrCloud nodes so that a new host:port
combination can take over for a previous shard. (Mark Miller)
* SOLR-4210: Requests to a Collection that does not exist on the receiving node
should be proxied to a suitable node. (Mark Miller, Po Rui)
Bug Fixes
----------------------

View File

@ -17,13 +17,22 @@
package org.apache.solr.servlet;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -33,15 +42,18 @@ import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
@ -50,7 +62,6 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.Config;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
@ -206,11 +217,22 @@ public class SolrDispatchFilter implements Filter
if (core != null) {
// we found a core, update the path
path = path.substring( idx );
} else {
// try the default core
}
// if we couldn't find it locally, look on other nodes
if (core == null && idx > 0) {
String coreUrl = getRemotCoreUrl(cores, corename);
if (coreUrl != null) {
path = path.substring( idx );
remoteQuery(coreUrl + path, req, solrReq, resp);
return;
}
}
// try the default core
if (core == null) {
core = cores.getCore("");
}
// TODO: if we couldn't find it locally, look on other nodes
}
// With a valid core...
@ -305,6 +327,118 @@ public class SolrDispatchFilter implements Filter
chain.doFilter(request, response);
}
private void remoteQuery(String coreUrl, HttpServletRequest req,
SolrQueryRequest solrReq, HttpServletResponse resp) throws IOException {
try {
String urlstr = coreUrl;
String queryString = req.getQueryString();
urlstr += queryString == null ? "" : "?" + queryString;
URL url = new URL(urlstr);
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod(req.getMethod());
con.setUseCaches(true);
con.setDoOutput(true);
con.setDoInput(true);
for (Enumeration e = req.getHeaderNames(); e.hasMoreElements();) {
String headerName = e.nextElement().toString();
con.setRequestProperty(headerName, req.getHeader(headerName));
}
try {
con.connect();
int theByte;
if (req.getMethod().equals("POST")) {
BufferedInputStream bis = new BufferedInputStream(
req.getInputStream());
BufferedOutputStream os = new BufferedOutputStream(
con.getOutputStream());
try {
while ((theByte = bis.read()) != -1) {
os.write(theByte);
}
os.flush();
} finally {
IOUtils.closeQuietly(os);
IOUtils.closeQuietly(bis);
}
}
resp.setStatus(con.getResponseCode());
for (Iterator i = con.getHeaderFields().entrySet().iterator(); i
.hasNext();) {
Map.Entry mapEntry = (Map.Entry) i.next();
if (mapEntry.getKey() != null) resp.setHeader(mapEntry.getKey()
.toString(), ((List) mapEntry.getValue()).get(0).toString());
}
resp.setCharacterEncoding(con.getContentEncoding());
resp.setContentType(con.getContentType());
BufferedInputStream bis = new BufferedInputStream(con.getInputStream());
ServletOutputStream os = resp.getOutputStream();
try {
while ((theByte = bis.read()) != -1) {
os.write(theByte);
}
os.flush();
} finally {
IOUtils.closeQuietly(os);
IOUtils.closeQuietly(bis);
}
} finally {
con.disconnect();
}
} catch (IOException e) {
// return exception
resp.getWriter().println(e.getStackTrace().toString());
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"This node forword query failed: " + coreUrl);
}
}
private String getRemotCoreUrl(CoreContainer cores, String collectionName) {
ClusterState clusterState = cores.getZkController().getClusterState();
Collection<Slice> slices = clusterState.getSlices(collectionName);
if (slices == null) {
Set<String> collections = clusterState.getCollections();
for (String collection : collections) {
slices = new ArrayList<Slice>();
slices.addAll(clusterState.getSlices(collection));
}
}
Set<String> liveNodes = clusterState.getLiveNodes();
Iterator<Slice> it = slices.iterator();
while (it.hasNext()) {
Slice slice = it.next();
Map<String,Replica> sliceShards = slice.getReplicasMap();
for (ZkNodeProps nodeProps : sliceShards.values()) {
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
if (liveNodes.contains(coreNodeProps.getNodeName())
&& coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) {
String coreUrl = coreNodeProps.getCoreUrl();
if (coreUrl.endsWith("/")) {
coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
}
if (coreNodeProps.getBaseUrl().equals(cores.getZkController().getBaseUrl())) {
// don't count a local core
return null;
}
return coreUrl;
}
}
}
return null;
}
private SolrCore getCoreByCollection(CoreContainer cores, String corename, String path) {
String collection = corename;
ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();

View File

@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
@ -55,6 +56,8 @@ import org.junit.BeforeClass;
* work as expected.
*/
public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
private static final String ONE_NODE_COLLECTION = "onenodecollection";
@BeforeClass
public static void beforeThisClass2() throws Exception {
// TODO: we use an fs based dir because something
@ -79,6 +82,8 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
testNodeWithoutCollectionForwarding();
indexr(id, 1, i1, 100, tlong, 100, t1,
"now is the time for all good men", "foo_f", 1.414f, "foo_b", "true",
"foo_d", 1.414d);
@ -154,6 +159,52 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
}
private void testNodeWithoutCollectionForwarding() throws Exception,
SolrServerException, IOException {
try {
final String baseUrl = ((HttpSolrServer) clients.get(0)).getBaseURL().substring(
0,
((HttpSolrServer) clients.get(0)).getBaseURL().length()
- DEFAULT_COLLECTION.length() - 1);
HttpSolrServer server = new HttpSolrServer(baseUrl);
server.setConnectionTimeout(15000);
server.setSoTimeout(30000);
Create createCmd = new Create();
createCmd.setRoles("none");
createCmd.setCoreName(ONE_NODE_COLLECTION + "core");
createCmd.setCollection(ONE_NODE_COLLECTION);
createCmd.setNumShards(1);
createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator
+ ONE_NODE_COLLECTION);
server.request(createCmd);
} catch (Exception e) {
e.printStackTrace();
//fail
}
waitForRecoveriesToFinish(ONE_NODE_COLLECTION, cloudClient.getZkStateReader(), false);
final String baseUrl2 = ((HttpSolrServer) clients.get(1)).getBaseURL().substring(
0,
((HttpSolrServer) clients.get(1)).getBaseURL().length()
- DEFAULT_COLLECTION.length() - 1);
HttpSolrServer qclient = new HttpSolrServer(baseUrl2 + "/onenodecollection" + "core");
// add a doc
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "1");
qclient.add(doc);
qclient.commit();
SolrQuery query = new SolrQuery("*:*");
QueryResponse results = qclient.query(query);
assertEquals(1, results.getResults().getNumFound());
qclient = new HttpSolrServer(baseUrl2 + "/onenodecollection");
results = qclient.query(query);
assertEquals(1, results.getResults().getNumFound());
}
private long testUpdateAndDelete() throws Exception {
long docId = 99999999L;
indexr("id", docId, t1, "originalcontent");