SOLR-3558 change SnapPuller to use solrj

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1353889 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sami Siren 2012-06-26 09:28:03 +00:00
parent 82533898f2
commit 0cf9ac0833
7 changed files with 129 additions and 151 deletions

View File

@ -137,7 +137,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
}
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
if (isClosed()) retries = INTERRUPTED;
boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure force=true does not download files we already have

View File

@ -42,7 +42,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -602,10 +601,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
if (showSlaveDetails && snapPuller != null) {
Properties props = loadReplicationProperties();
try {
NamedList<String> command = new NamedList<String>();
command.add(COMMAND, CMD_DETAILS);
command.add("slave", "false");
NamedList nl = snapPuller.getCommandResponse(command);
NamedList nl = snapPuller.getDetails();
slave.add("masterDetails", nl.get(CMD_DETAILS));
} catch (Exception e) {
LOG.warn("Exception while invoking 'details' method for replication on master ", e);

View File

@ -17,22 +17,19 @@
package org.apache.solr.handler;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.AbstractHttpMessage;
import org.apache.http.message.BasicNameValuePair;
import org.apache.lucene.index.IndexCommit;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.util.FileUtils;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
@ -57,7 +54,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;
/**
@ -116,13 +112,14 @@ public class SnapPuller {
// HttpClient for this instance if connectionTimeout or readTimeout has been specified
private final HttpClient myHttpClient;
private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword) {
private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
if (connTimeout == null && readTimeout == null && client != null) return client;
final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
httpClientParams.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connTimeout != null ? connTimeout : "5000");
httpClientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, readTimeout != null ? readTimeout : "20000");
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
// Keeping a very high number so that if you have a large number of cores
// no requests are kept waiting for an idle connection.
httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
@ -132,13 +129,19 @@ public class SnapPuller {
return httpClient;
}
public SnapPuller(NamedList initArgs, ReplicationHandler handler, SolrCore sc) {
public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
solrCore = sc;
SolrParams params = SolrParams.toSolrParams(initArgs);
masterUrl = (String) initArgs.get(MASTER_URL);
final SolrParams params = SolrParams.toSolrParams(initArgs);
String masterUrl = (String) initArgs.get(MASTER_URL);
if (masterUrl == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"'masterUrl' is required for a slave");
if (masterUrl.endsWith("/replication")) {
masterUrl = masterUrl.substring(0, masterUrl.length()-12);
LOG.warn("'masterUrl' must be specified without the /replication suffix");
}
this.masterUrl = masterUrl;
this.replicationHandler = handler;
pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
pollInterval = readInterval(pollIntervalStr);
@ -149,7 +152,7 @@ public class SnapPuller {
String readTimeout = (String) initArgs.get(HttpClientUtil.PROP_SO_TIMEOUT);
String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
myHttpClient = createHttpClient(connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword);
myHttpClient = createHttpClient(connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal);
if (pollInterval != null && pollInterval > 0) {
startExecutorService();
} else {
@ -183,82 +186,49 @@ public class SnapPuller {
*/
@SuppressWarnings("unchecked")
NamedList getLatestVersion() throws IOException {
HttpPost post = new HttpPost(masterUrl);
List<BasicNameValuePair> formparams = new ArrayList<BasicNameValuePair>();
formparams.add(new BasicNameValuePair("wt", "javabin"));
formparams.add(new BasicNameValuePair(COMMAND, CMD_INDEX_VERSION));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8");
post.setEntity(entity);
return getNamedListResponse(post);
}
NamedList getCommandResponse(NamedList<String> commands) throws IOException {
HttpPost post = new HttpPost(masterUrl);
List<BasicNameValuePair> formparams = new ArrayList<BasicNameValuePair>();
formparams.add(new BasicNameValuePair("wt", "javabin"));
for (Map.Entry<String, String> c : commands) {
formparams.add(new BasicNameValuePair(c.getKey(), c.getValue()));
}
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8");
post.setEntity(entity);
return getNamedListResponse(post);
}
private NamedList<?> getNamedListResponse(HttpPost method) throws IOException {
InputStream input = null;
NamedList<?> result = null;
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_INDEX_VERSION);
params.set(CommonParams.WT, "javabin");
params.set(CommonParams.QT, "/replication");
QueryRequest req = new QueryRequest(params);
SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler
try {
HttpResponse response = myHttpClient.execute(method);
int status = response.getStatusLine().getStatusCode();
if (status != HttpStatus.SC_OK) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"Request failed for the url " + method);
}
input = response.getEntity().getContent();
result = (NamedList<?>)new JavaBinCodec().unmarshal(input);
} finally {
try {
if (input != null) {
input.close();
}
} catch (Exception e) {
}
return server.request(req);
} catch (SolrServerException e) {
throw new IOException(e);
}
return result;
}
/**
* Fetches the list of files in a given index commit point
* Fetches the list of files in a given index commit point and updates internal list of files to download.
*/
void fetchFileList(long gen) throws IOException {
HttpPost post = new HttpPost(masterUrl);
private void fetchFileList(long gen) throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_GET_FILE_LIST);
params.set(GENERATION, String.valueOf(gen));
params.set(CommonParams.WT, "javabin");
params.set(CommonParams.QT, "/replication");
QueryRequest req = new QueryRequest(params);
SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler
List<BasicNameValuePair> formparams = new ArrayList<BasicNameValuePair>();
formparams.add(new BasicNameValuePair("wt", "javabin"));
formparams.add(new BasicNameValuePair(COMMAND, CMD_GET_FILE_LIST));
formparams.add(new BasicNameValuePair(GENERATION, String.valueOf(gen)));
try {
NamedList response = server.request(req);
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8");
post.setEntity(entity);
List<Map<String, Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST);
if (files != null)
filesToDownload = Collections.synchronizedList(files);
else {
filesToDownload = Collections.emptyList();
LOG.error("No files to download for index generation: "+ gen);
}
@SuppressWarnings("unchecked")
NamedList<List<Map<String, Object>>> nl
= (NamedList<List<Map<String, Object>>>) getNamedListResponse(post);
files = (List<Map<String,Object>>) response.get(CONF_FILES);
if (files != null)
confFilesToDownload = Collections.synchronizedList(files);
List<Map<String, Object>> f = nl.get(CMD_GET_FILE_LIST);
if (f != null)
filesToDownload = Collections.synchronizedList(f);
else {
filesToDownload = Collections.emptyList();
LOG.error("No files to download for index generation: "+ gen);
} catch (SolrServerException e) {
throw new IOException(e);
}
f = nl.get(CONF_FILES);
if (f != null)
confFilesToDownload = Collections.synchronizedList(f);
}
/**
@ -270,7 +240,7 @@ public class SnapPuller {
* @throws IOException if an exception occurs
*/
@SuppressWarnings("unchecked")
boolean successfulInstall = false;
private boolean successfulInstall = false;
boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException, InterruptedException {
successfulInstall = false;
@ -953,8 +923,6 @@ public class SnapPuller {
private boolean isConf;
private HttpPost post;
private boolean aborted = false;
private Long indexGen;
@ -1130,79 +1098,62 @@ public class SnapPuller {
* Open a new stream using HttpClient
*/
FastInputStream getStream() throws IOException {
post = new HttpPost(masterUrl);
//the method is command=filecontent
List<BasicNameValuePair> formparams = new ArrayList<BasicNameValuePair>();
formparams.add(new BasicNameValuePair(COMMAND, CMD_GET_FILE));
SolrServer s = new HttpSolrServer(masterUrl, myHttpClient, null); //XXX use shardhandler
ModifiableSolrParams params = new ModifiableSolrParams();
// //the method is command=filecontent
params.set(COMMAND, CMD_GET_FILE);
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, "/replication");
//add the version to download. This is used to reserve the download
formparams.add(new BasicNameValuePair(GENERATION, indexGen.toString()));
if (isConf) {
//set cf instead of file for config file
formparams.add(new BasicNameValuePair(CONF_FILE_SHORT, fileName));
params.set(CONF_FILE_SHORT, fileName);
} else {
formparams.add(new BasicNameValuePair(FILE, fileName));
params.set(FILE, fileName);
}
if (useInternal) {
formparams.add(new BasicNameValuePair(COMPRESSION, "true"));
}
if (useExternal) {
formparams.add(new BasicNameValuePair("Accept-Encoding", "gzip,deflate"));
params.set(COMPRESSION, "internal");
}
//use checksum
if (this.includeChecksum)
formparams.add(new BasicNameValuePair(CHECKSUM, "true"));
if (this.includeChecksum) {
params.set(CHECKSUM, true);
}
//wt=filestream this is a custom protocol
formparams.add(new BasicNameValuePair("wt", FILE_STREAM));
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset
params.set(CommonParams.WT, FILE_STREAM);
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset
if (bytesDownloaded > 0) {
formparams.add(new BasicNameValuePair(OFFSET, "" + bytesDownloaded));
params.set(OFFSET, Long.toString(bytesDownloaded));
}
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8");
post.setEntity(entity);
HttpResponse response = myHttpClient.execute(post);
InputStream is = response.getEntity().getContent();
//wrap it using FastInputStream
if (useInternal) {
is = new InflaterInputStream(is);
} else if (useExternal) {
is = checkCompressed(post, is);
NamedList response;
InputStream is = null;
try {
QueryRequest req = new QueryRequest(params);
response = s.request(req);
is = (InputStream) response.get("stream");
if(useInternal) {
is = new InflaterInputStream(is);
}
return new FastInputStream(is);
} catch (Throwable t) {
//close stream on error
IOUtils.closeQuietly(is);
throw new IOException("Could not download file '" + fileName + "'", t);
}
return new FastInputStream(is);
}
}
/*
* This is copied from CommonsHttpSolrServer
*/
private InputStream checkCompressed(AbstractHttpMessage method, InputStream respBody) throws IOException {
Header contentEncodingHeader = method.getFirstHeader("Content-Encoding");
if (contentEncodingHeader != null) {
String contentEncoding = contentEncodingHeader.getValue();
if (contentEncoding.contains("gzip")) {
respBody = new GZIPInputStream(respBody);
} else if (contentEncoding.contains("deflate")) {
respBody = new InflaterInputStream(respBody);
}
} else {
Header contentTypeHeader = method.getFirstHeader("Content-Type");
if (contentTypeHeader != null) {
String contentType = contentTypeHeader.getValue();
if (contentType != null) {
if (contentType.startsWith("application/x-gzip-compressed")) {
respBody = new GZIPInputStream(respBody);
} else if (contentType.startsWith("application/x-deflate")) {
respBody = new InflaterInputStream(respBody);
}
}
}
}
return respBody;
NamedList getDetails() throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_DETAILS);
params.set("slave", false);
params.set(CommonParams.QT, "/replication");
SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX use shardhandler
QueryRequest request = new QueryRequest(params);
return server.request(request);
}
static Integer readInterval(String interval) {

View File

@ -51,7 +51,7 @@
<requestHandler name="/replication" class="solr.ReplicationHandler">
<lst name="slave">
<str name="masterUrl">http://localhost:TEST_PORT/solr/replication</str>
<str name="masterUrl">http://localhost:TEST_PORT/solr</str>
<str name="pollInterval">00:00:01</str>
</lst>
</requestHandler>

View File

@ -1089,7 +1089,7 @@
<str name="confFiles">schema.xml,stopwords.txt</str>
</lst>
<lst name="slave">
<str name="masterUrl">http://localhost:8983/solr/replication</str>
<str name="masterUrl">http://localhost:8983/solr</str>
<str name="pollInterval">00:00:60</str>
</lst>
</requestHandler>

View File

@ -201,8 +201,10 @@ public class HttpSolrServer extends SolrServer {
// The parser 'wt=' and 'version=' params are used instead of the original
// params
ModifiableSolrParams wparams = new ModifiableSolrParams(params);
wparams.set(CommonParams.WT, parser.getWriterType());
wparams.set(CommonParams.VERSION, parser.getVersion());
if (parser != null) {
wparams.set(CommonParams.WT, parser.getWriterType());
wparams.set(CommonParams.VERSION, parser.getVersion());
}
if (invariantParams != null) {
wparams.add(invariantParams);
}
@ -352,7 +354,6 @@ public class HttpSolrServer extends SolrServer {
int httpStatus = response.getStatusLine().getStatusCode();
// Read the contents
String charset = EntityUtils.getContentCharSet(response.getEntity());
respBody = response.getEntity().getContent();
// handle some http level checks before trying to parse the response
@ -377,6 +378,13 @@ public class HttpSolrServer extends SolrServer {
+ response.getStatusLine().getReasonPhrase());
}
if (processor == null) {
// no processor specified, return raw stream
NamedList<Object> rsp = new NamedList<Object>();
rsp.add("stream", respBody);
return rsp;
}
String charset = EntityUtils.getContentCharSet(response.getEntity());
NamedList<Object> rsp = processor.processResponse(respBody, charset);
if (httpStatus != HttpStatus.SC_OK) {
String reason = null;
@ -409,7 +417,7 @@ public class HttpSolrServer extends SolrServer {
throw new SolrServerException(
"IOException occured when talking to server at: " + getBaseURL(), e);
} finally {
if (respBody != null) {
if (respBody != null && processor!=null) {
try {
respBody.close();
} catch (Throwable t) {} // ignore

View File

@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.Socket;
import java.util.Enumeration;
@ -38,10 +39,12 @@ import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.ExternalPaths;
import org.junit.BeforeClass;
import org.junit.Test;
@ -134,6 +137,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
} catch (SolrServerException e) {
assertTrue(e.getMessage().contains("refused"));
}
server.shutdown();
}
@Test
@ -148,6 +152,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
} catch (SolrServerException e) {
assertTrue(e.getMessage().contains("Timeout"));
}
server.shutdown();
}
@Test
@ -234,6 +239,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
assertEquals("keep-alive", DebugServlet.headers.get("Connection"));
assertEquals("application/x-www-form-urlencoded; charset=UTF-8", DebugServlet.headers.get("Content-Type"));
assertEquals("UTF-8", DebugServlet.headers.get("Content-Charset"));
server.shutdown();
}
@Test
@ -274,6 +280,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
assertEquals(server.getParser().getVersion(), DebugServlet.parameters.get(CommonParams.VERSION)[0]);
assertEquals("Solr[" + org.apache.solr.client.solrj.impl.HttpSolrServer.class.getName() + "] 1.0", DebugServlet.headers.get("User-Agent"));
assertEquals("keep-alive", DebugServlet.headers.get("Connection"));
server.shutdown();
}
@Test
@ -335,6 +342,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
assertEquals("application/javabin", DebugServlet.headers.get("Content-Type"));
assertEquals(1, DebugServlet.parameters.get("a").length);
assertEquals("\u1234", DebugServlet.parameters.get("a")[0]);
server.shutdown();
}
@Test
@ -355,6 +363,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
} catch (Throwable t) {
fail("Exception was thrown:" + t);
}
server.shutdown();
}
@Test
@ -406,6 +415,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
q = new SolrQuery("foo");
QueryResponse response = server.query(q);
assertEquals(0, response.getStatus());
server.shutdown();
}
@Test
@ -420,8 +430,21 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase {
server.setDefaultMaxConnectionsPerHost(1);
fail("Operation should not succeed.");
} catch (UnsupportedOperationException e) {}
client.getConnectionManager().shutdown();
}
@Test
public void testGetRawStream() throws SolrServerException, IOException{
HttpClient client = HttpClientUtil.createClient(null);
HttpSolrServer server = new HttpSolrServer("http://127.0.0.1:" + jetty.getLocalPort() + "/solr", client, null);
QueryRequest req = new QueryRequest();
NamedList response = server.request(req);
InputStream stream = (InputStream)response.get("stream");
assertNotNull(stream);
stream.close();
client.getConnectionManager().shutdown();
}
private int findUnusedPort() {
for (int port = 0; port < 65535; port++) {
Socket s = new Socket();