From 0cf9ac0833aab766919eda2d2dc9584b815e4147 Mon Sep 17 00:00:00 2001 From: Sami Siren Date: Tue, 26 Jun 2012 09:28:03 +0000 Subject: [PATCH] SOLR-3558 change SnapPuller to use solrj git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1353889 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/solr/cloud/RecoveryStrategy.java | 2 +- .../solr/handler/ReplicationHandler.java | 6 +- .../org/apache/solr/handler/SnapPuller.java | 227 +++++++----------- .../test-files/solr/conf/solrconfig-slave.xml | 2 +- solr/example/solr/conf/solrconfig.xml | 2 +- .../client/solrj/impl/HttpSolrServer.java | 16 +- .../solrj/impl/BasicHttpSolrServerTest.java | 25 +- 7 files changed, 129 insertions(+), 151 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 06a4401d3f0..36ffb9fed81 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -137,7 +137,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { } ModifiableSolrParams solrParams = new ModifiableSolrParams(); - solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication"); + solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl); if (isClosed()) retries = INTERRUPTED; boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure force=true does not download files we already have diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index a9b4e8f5b62..226b9e43ac9 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -42,7 +42,6 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.DirectoryReader; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -602,10 +601,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw if (showSlaveDetails && snapPuller != null) { Properties props = loadReplicationProperties(); try { - NamedList command = new NamedList(); - command.add(COMMAND, CMD_DETAILS); - command.add("slave", "false"); - NamedList nl = snapPuller.getCommandResponse(command); + NamedList nl = snapPuller.getDetails(); slave.add("masterDetails", nl.get(CMD_DETAILS)); } catch (Exception e) { LOG.warn("Exception while invoking 'details' method for replication on master ", e); diff --git a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java b/solr/core/src/java/org/apache/solr/handler/SnapPuller.java index 069d90195ff..445e7a81fba 100644 --- a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java +++ b/solr/core/src/java/org/apache/solr/handler/SnapPuller.java @@ -17,22 +17,19 @@ package org.apache.solr.handler; import org.apache.commons.io.IOUtils; -import org.apache.http.Header; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.message.AbstractHttpMessage; -import org.apache.http.message.BasicNameValuePair; import org.apache.lucene.index.IndexCommit; +import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrServer; +import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.FastInputStream; import org.apache.solr.util.FileUtils; -import org.apache.solr.common.util.JavaBinCodec; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.SolrCore; import org.apache.solr.core.IndexDeletionPolicyWrapper; @@ -57,7 +54,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.Adler32; import java.util.zip.Checksum; -import java.util.zip.GZIPInputStream; import java.util.zip.InflaterInputStream; /** @@ -116,13 +112,14 @@ public class SnapPuller { // HttpClient for this instance if connectionTimeout or readTimeout has been specified private final HttpClient myHttpClient; - private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword) { + private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) { if (connTimeout == null && readTimeout == null && client != null) return client; final ModifiableSolrParams httpClientParams = new ModifiableSolrParams(); httpClientParams.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connTimeout != null ? connTimeout : "5000"); httpClientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, readTimeout != null ? readTimeout : "20000"); httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser); httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword); + httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression); // Keeping a very high number so that if you have a large number of cores // no requests are kept waiting for an idle connection. httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000); @@ -132,13 +129,19 @@ public class SnapPuller { return httpClient; } - public SnapPuller(NamedList initArgs, ReplicationHandler handler, SolrCore sc) { + public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) { solrCore = sc; - SolrParams params = SolrParams.toSolrParams(initArgs); - masterUrl = (String) initArgs.get(MASTER_URL); + final SolrParams params = SolrParams.toSolrParams(initArgs); + String masterUrl = (String) initArgs.get(MASTER_URL); if (masterUrl == null) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'masterUrl' is required for a slave"); + if (masterUrl.endsWith("/replication")) { + masterUrl = masterUrl.substring(0, masterUrl.length()-12); + LOG.warn("'masterUrl' must be specified without the /replication suffix"); + } + this.masterUrl = masterUrl; + this.replicationHandler = handler; pollIntervalStr = (String) initArgs.get(POLL_INTERVAL); pollInterval = readInterval(pollIntervalStr); @@ -149,7 +152,7 @@ public class SnapPuller { String readTimeout = (String) initArgs.get(HttpClientUtil.PROP_SO_TIMEOUT); String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER); String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS); - myHttpClient = createHttpClient(connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword); + myHttpClient = createHttpClient(connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal); if (pollInterval != null && pollInterval > 0) { startExecutorService(); } else { @@ -183,82 +186,49 @@ public class SnapPuller { */ @SuppressWarnings("unchecked") NamedList getLatestVersion() throws IOException { - HttpPost post = new HttpPost(masterUrl); - List formparams = new ArrayList(); - formparams.add(new BasicNameValuePair("wt", "javabin")); - formparams.add(new BasicNameValuePair(COMMAND, CMD_INDEX_VERSION)); - UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8"); - post.setEntity(entity); - return getNamedListResponse(post); - } - - NamedList getCommandResponse(NamedList commands) throws IOException { - - HttpPost post = new HttpPost(masterUrl); - - List formparams = new ArrayList(); - formparams.add(new BasicNameValuePair("wt", "javabin")); - - for (Map.Entry c : commands) { - formparams.add(new BasicNameValuePair(c.getKey(), c.getValue())); - } - UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8"); - post.setEntity(entity); - return getNamedListResponse(post); - } - - private NamedList getNamedListResponse(HttpPost method) throws IOException { - InputStream input = null; - NamedList result = null; + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(COMMAND, CMD_INDEX_VERSION); + params.set(CommonParams.WT, "javabin"); + params.set(CommonParams.QT, "/replication"); + QueryRequest req = new QueryRequest(params); + SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler try { - HttpResponse response = myHttpClient.execute(method); - int status = response.getStatusLine().getStatusCode(); - if (status != HttpStatus.SC_OK) { - throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, - "Request failed for the url " + method); - } - input = response.getEntity().getContent(); - result = (NamedList)new JavaBinCodec().unmarshal(input); - } finally { - try { - if (input != null) { - input.close(); - } - } catch (Exception e) { - } + return server.request(req); + } catch (SolrServerException e) { + throw new IOException(e); } - return result; } /** - * Fetches the list of files in a given index commit point + * Fetches the list of files in a given index commit point and updates internal list of files to download. */ - void fetchFileList(long gen) throws IOException { - HttpPost post = new HttpPost(masterUrl); + private void fetchFileList(long gen) throws IOException { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(COMMAND, CMD_GET_FILE_LIST); + params.set(GENERATION, String.valueOf(gen)); + params.set(CommonParams.WT, "javabin"); + params.set(CommonParams.QT, "/replication"); + QueryRequest req = new QueryRequest(params); + SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler - List formparams = new ArrayList(); - formparams.add(new BasicNameValuePair("wt", "javabin")); - formparams.add(new BasicNameValuePair(COMMAND, CMD_GET_FILE_LIST)); - formparams.add(new BasicNameValuePair(GENERATION, String.valueOf(gen))); + try { + NamedList response = server.request(req); - UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8"); - post.setEntity(entity); + List> files = (List>) response.get(CMD_GET_FILE_LIST); + if (files != null) + filesToDownload = Collections.synchronizedList(files); + else { + filesToDownload = Collections.emptyList(); + LOG.error("No files to download for index generation: "+ gen); + } - @SuppressWarnings("unchecked") - NamedList>> nl - = (NamedList>>) getNamedListResponse(post); + files = (List>) response.get(CONF_FILES); + if (files != null) + confFilesToDownload = Collections.synchronizedList(files); - List> f = nl.get(CMD_GET_FILE_LIST); - if (f != null) - filesToDownload = Collections.synchronizedList(f); - else { - filesToDownload = Collections.emptyList(); - LOG.error("No files to download for index generation: "+ gen); + } catch (SolrServerException e) { + throw new IOException(e); } - - f = nl.get(CONF_FILES); - if (f != null) - confFilesToDownload = Collections.synchronizedList(f); } /** @@ -270,7 +240,7 @@ public class SnapPuller { * @throws IOException if an exception occurs */ @SuppressWarnings("unchecked") - boolean successfulInstall = false; + private boolean successfulInstall = false; boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException, InterruptedException { successfulInstall = false; @@ -953,8 +923,6 @@ public class SnapPuller { private boolean isConf; - private HttpPost post; - private boolean aborted = false; private Long indexGen; @@ -1130,79 +1098,62 @@ public class SnapPuller { * Open a new stream using HttpClient */ FastInputStream getStream() throws IOException { - post = new HttpPost(masterUrl); - //the method is command=filecontent - - List formparams = new ArrayList(); - - formparams.add(new BasicNameValuePair(COMMAND, CMD_GET_FILE)); + SolrServer s = new HttpSolrServer(masterUrl, myHttpClient, null); //XXX use shardhandler + ModifiableSolrParams params = new ModifiableSolrParams(); +// //the method is command=filecontent + params.set(COMMAND, CMD_GET_FILE); + params.set(GENERATION, Long.toString(indexGen)); + params.set(CommonParams.QT, "/replication"); //add the version to download. This is used to reserve the download - formparams.add(new BasicNameValuePair(GENERATION, indexGen.toString())); if (isConf) { //set cf instead of file for config file - formparams.add(new BasicNameValuePair(CONF_FILE_SHORT, fileName)); + params.set(CONF_FILE_SHORT, fileName); } else { - formparams.add(new BasicNameValuePair(FILE, fileName)); + params.set(FILE, fileName); } if (useInternal) { - formparams.add(new BasicNameValuePair(COMPRESSION, "true")); - } - if (useExternal) { - formparams.add(new BasicNameValuePair("Accept-Encoding", "gzip,deflate")); + params.set(COMPRESSION, "internal"); } //use checksum - if (this.includeChecksum) - formparams.add(new BasicNameValuePair(CHECKSUM, "true")); + if (this.includeChecksum) { + params.set(CHECKSUM, true); + } //wt=filestream this is a custom protocol - formparams.add(new BasicNameValuePair("wt", FILE_STREAM)); - // This happen if there is a failure there is a retry. the offset= ensures that - // the server starts from the offset + params.set(CommonParams.WT, FILE_STREAM); + // This happen if there is a failure there is a retry. the offset= ensures that + // the server starts from the offset if (bytesDownloaded > 0) { - formparams.add(new BasicNameValuePair(OFFSET, "" + bytesDownloaded)); + params.set(OFFSET, Long.toString(bytesDownloaded)); } - UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8"); - post.setEntity(entity); - HttpResponse response = myHttpClient.execute(post); - InputStream is = response.getEntity().getContent(); - //wrap it using FastInputStream - if (useInternal) { - is = new InflaterInputStream(is); - } else if (useExternal) { - is = checkCompressed(post, is); + NamedList response; + InputStream is = null; + try { + QueryRequest req = new QueryRequest(params); + response = s.request(req); + is = (InputStream) response.get("stream"); + if(useInternal) { + is = new InflaterInputStream(is); + } + return new FastInputStream(is); + } catch (Throwable t) { + //close stream on error + IOUtils.closeQuietly(is); + throw new IOException("Could not download file '" + fileName + "'", t); } - return new FastInputStream(is); } } - - /* - * This is copied from CommonsHttpSolrServer - */ - private InputStream checkCompressed(AbstractHttpMessage method, InputStream respBody) throws IOException { - Header contentEncodingHeader = method.getFirstHeader("Content-Encoding"); - if (contentEncodingHeader != null) { - String contentEncoding = contentEncodingHeader.getValue(); - if (contentEncoding.contains("gzip")) { - respBody = new GZIPInputStream(respBody); - } else if (contentEncoding.contains("deflate")) { - respBody = new InflaterInputStream(respBody); - } - } else { - Header contentTypeHeader = method.getFirstHeader("Content-Type"); - if (contentTypeHeader != null) { - String contentType = contentTypeHeader.getValue(); - if (contentType != null) { - if (contentType.startsWith("application/x-gzip-compressed")) { - respBody = new GZIPInputStream(respBody); - } else if (contentType.startsWith("application/x-deflate")) { - respBody = new InflaterInputStream(respBody); - } - } - } - } - return respBody; + + NamedList getDetails() throws IOException, SolrServerException { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(COMMAND, CMD_DETAILS); + params.set("slave", false); + params.set(CommonParams.QT, "/replication"); + SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX use shardhandler + QueryRequest request = new QueryRequest(params); + return server.request(request); } static Integer readInterval(String interval) { diff --git a/solr/core/src/test-files/solr/conf/solrconfig-slave.xml b/solr/core/src/test-files/solr/conf/solrconfig-slave.xml index 67b3f1dc6c5..5ee5459212a 100644 --- a/solr/core/src/test-files/solr/conf/solrconfig-slave.xml +++ b/solr/core/src/test-files/solr/conf/solrconfig-slave.xml @@ -51,7 +51,7 @@ - http://localhost:TEST_PORT/solr/replication + http://localhost:TEST_PORT/solr 00:00:01 diff --git a/solr/example/solr/conf/solrconfig.xml b/solr/example/solr/conf/solrconfig.xml index 64bf60ac0ff..b78c295e098 100755 --- a/solr/example/solr/conf/solrconfig.xml +++ b/solr/example/solr/conf/solrconfig.xml @@ -1089,7 +1089,7 @@ schema.xml,stopwords.txt - http://localhost:8983/solr/replication + http://localhost:8983/solr 00:00:60 diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java index 7fa39d8aff4..ad3c5bcb4a8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java @@ -201,8 +201,10 @@ public class HttpSolrServer extends SolrServer { // The parser 'wt=' and 'version=' params are used instead of the original // params ModifiableSolrParams wparams = new ModifiableSolrParams(params); - wparams.set(CommonParams.WT, parser.getWriterType()); - wparams.set(CommonParams.VERSION, parser.getVersion()); + if (parser != null) { + wparams.set(CommonParams.WT, parser.getWriterType()); + wparams.set(CommonParams.VERSION, parser.getVersion()); + } if (invariantParams != null) { wparams.add(invariantParams); } @@ -352,7 +354,6 @@ public class HttpSolrServer extends SolrServer { int httpStatus = response.getStatusLine().getStatusCode(); // Read the contents - String charset = EntityUtils.getContentCharSet(response.getEntity()); respBody = response.getEntity().getContent(); // handle some http level checks before trying to parse the response @@ -377,6 +378,13 @@ public class HttpSolrServer extends SolrServer { + response.getStatusLine().getReasonPhrase()); } + if (processor == null) { + // no processor specified, return raw stream + NamedList rsp = new NamedList(); + rsp.add("stream", respBody); + return rsp; + } + String charset = EntityUtils.getContentCharSet(response.getEntity()); NamedList rsp = processor.processResponse(respBody, charset); if (httpStatus != HttpStatus.SC_OK) { String reason = null; @@ -409,7 +417,7 @@ public class HttpSolrServer extends SolrServer { throw new SolrServerException( "IOException occured when talking to server at: " + getBaseURL(), e); } finally { - if (respBody != null) { + if (respBody != null && processor!=null) { try { respBody.close(); } catch (Throwable t) {} // ignore diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/BasicHttpSolrServerTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/BasicHttpSolrServerTest.java index a291a360e6d..5a93ba29a2e 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/BasicHttpSolrServerTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/BasicHttpSolrServerTest.java @@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.impl; import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.Socket; import java.util.Enumeration; @@ -38,10 +39,12 @@ import org.apache.solr.SolrJettyTestBase; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrRequest.METHOD; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.util.NamedList; import org.apache.solr.util.ExternalPaths; import org.junit.BeforeClass; import org.junit.Test; @@ -134,6 +137,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { } catch (SolrServerException e) { assertTrue(e.getMessage().contains("refused")); } + server.shutdown(); } @Test @@ -148,6 +152,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { } catch (SolrServerException e) { assertTrue(e.getMessage().contains("Timeout")); } + server.shutdown(); } @Test @@ -234,6 +239,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { assertEquals("keep-alive", DebugServlet.headers.get("Connection")); assertEquals("application/x-www-form-urlencoded; charset=UTF-8", DebugServlet.headers.get("Content-Type")); assertEquals("UTF-8", DebugServlet.headers.get("Content-Charset")); + server.shutdown(); } @Test @@ -274,6 +280,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { assertEquals(server.getParser().getVersion(), DebugServlet.parameters.get(CommonParams.VERSION)[0]); assertEquals("Solr[" + org.apache.solr.client.solrj.impl.HttpSolrServer.class.getName() + "] 1.0", DebugServlet.headers.get("User-Agent")); assertEquals("keep-alive", DebugServlet.headers.get("Connection")); + server.shutdown(); } @Test @@ -335,6 +342,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { assertEquals("application/javabin", DebugServlet.headers.get("Content-Type")); assertEquals(1, DebugServlet.parameters.get("a").length); assertEquals("\u1234", DebugServlet.parameters.get("a")[0]); + server.shutdown(); } @Test @@ -355,6 +363,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { } catch (Throwable t) { fail("Exception was thrown:" + t); } + server.shutdown(); } @Test @@ -406,6 +415,7 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { q = new SolrQuery("foo"); QueryResponse response = server.query(q); assertEquals(0, response.getStatus()); + server.shutdown(); } @Test @@ -420,8 +430,21 @@ public class BasicHttpSolrServerTest extends SolrJettyTestBase { server.setDefaultMaxConnectionsPerHost(1); fail("Operation should not succeed."); } catch (UnsupportedOperationException e) {} + client.getConnectionManager().shutdown(); } - + + @Test + public void testGetRawStream() throws SolrServerException, IOException{ + HttpClient client = HttpClientUtil.createClient(null); + HttpSolrServer server = new HttpSolrServer("http://127.0.0.1:" + jetty.getLocalPort() + "/solr", client, null); + QueryRequest req = new QueryRequest(); + NamedList response = server.request(req); + InputStream stream = (InputStream)response.get("stream"); + assertNotNull(stream); + stream.close(); + client.getConnectionManager().shutdown(); + } + private int findUnusedPort() { for (int port = 0; port < 65535; port++) { Socket s = new Socket();