SOLR-6787 API to manage blobs in Solr

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1644095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2014-12-09 15:58:32 +00:00
parent 8c80635328
commit 3a102c548a
12 changed files with 537 additions and 48 deletions

View File

@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Writer;
import java.lang.reflect.Constructor;
import java.net.URL;
@ -64,6 +65,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@ -75,6 +77,7 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.SnapPuller;
import org.apache.solr.handler.SolrConfigHandler;
@ -2091,9 +2094,29 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
m.put("csv", new CSVResponseWriter());
m.put("xsort", new SortingResponseWriter());
m.put("schema.xml", new SchemaXmlResponseWriter());
m.put(ReplicationHandler.FILE_STREAM, getFileStreamWriter());
DEFAULT_RESPONSE_WRITERS = Collections.unmodifiableMap(m);
}
private static BinaryResponseWriter getFileStreamWriter() {
return new BinaryResponseWriter(){
@Override
public void write(OutputStream out, SolrQueryRequest req, SolrQueryResponse response) throws IOException {
RawWriter rawWriter = (RawWriter) response.getValues().get(ReplicationHandler.FILE_STREAM);
rawWriter.write(out);
}
@Override
public String getContentType(SolrQueryRequest request, SolrQueryResponse response) {
return BinaryResponseParser.BINARY_CONTENT_TYPE;
}
};
}
public interface RawWriter {
public void write(OutputStream os) throws IOException ;
}
/** Configure the query response writers. There will always be a default writer; additional
* writers may also be configured. */
private void initWriters() {

View File

@ -0,0 +1,267 @@
package org.apache.solr.handler;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.StorableField;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.search.QParser;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.SimplePostTool;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
public class BlobHandler extends RequestHandlerBase implements PluginInfoInitialized{
protected static final Logger log = LoggerFactory.getLogger(BlobHandler.class);
private static final long MAX_SZ = 5*1024*1024;//2MB
private long maxSize = MAX_SZ;
@Override
public void handleRequestBody(final SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
String httpMethod = (String) req.getContext().get("httpMethod");
String path = (String) req.getContext().get("path");
SolrConfigHandler.setWt(req,"json");
List<String> pieces = StrUtils.splitSmart(path, '/');
String blobName = null;
if(pieces.size()>=3) blobName = pieces.get(2);
if("POST".equals(httpMethod)) {
if (blobName == null || blobName.isEmpty()) {
rsp.add("error","Name not found");
return;
}
if(req.getContentStreams() == null ) {
rsp.add("error","No stream");
return;
}
for (ContentStream stream : req.getContentStreams()) {
ByteBuffer payload = SimplePostTool.inputStreamToByteArray(stream.getStream(), maxSize);
MessageDigest m = MessageDigest.getInstance("MD5");
m.update(payload.array(),payload.position(),payload.limit());
String md5 = new BigInteger(1,m.digest()).toString(16);
TopDocs duplicate = req.getSearcher().search(new TermQuery(new Term("id", md5)), 1);
if(duplicate.totalHits >0){
rsp.add("error", "duplicate entry");
SolrParams solrParams = new MapSolrParams((Map) ZkNodeProps.makeMap(
"q", "id:" + md5,
"fl", "id,size,version,timestamp,blobName"));
try(LocalSolrQueryRequest r = new LocalSolrQueryRequest(req.getCore(), solrParams)) {
req.getCore().getRequestHandler(null).handleRequest(r, rsp);
}
return;
}
TopFieldDocs docs = req.getSearcher().search(new TermQuery(new Term("blobName", blobName)),
null, 1, new Sort(new SortField("version", SortField.Type.LONG, true)));
long version = 0;
if(docs.totalHits >0){
StoredDocument doc = req.getSearcher().doc(docs.scoreDocs[0].doc);
Number n = doc.getField("version").numericValue();
version = n.longValue();
}
indexMap(req, makeMap(
"id", md5,
"blobName", blobName,
"version", ++version,
"timestamp", new Date(),
"size", payload.limit(),
"blob", payload));
break;
}
} else {
int version =-1;
if(pieces.size()>3){
try {
version = Integer.parseInt(pieces.get(3));
} catch (NumberFormatException e) {
rsp.add("error", "Invalid version" + pieces.get(3));
return;
}
}
if(ReplicationHandler.FILE_STREAM.equals(req.getParams().get(CommonParams.WT))){
if(blobName == null ){
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "Please send the request in the format /blob/<blobName>/<version>");
} else {
String q = "blobName:{0}";
if(version!=-1) q+= " AND version:{1}";
QParser qparser = QParser.getParser(MessageFormat.format(q,blobName,version) , "lucene", req);
final TopDocs docs = req.getSearcher().search(qparser.parse(), 1, new Sort( new SortField("version", SortField.Type.LONG, true)));
if(docs.totalHits>0){
rsp.add(ReplicationHandler.FILE_STREAM, new SolrCore.RawWriter(){
@Override
public void write(OutputStream os) throws IOException {
StoredDocument doc = req.getSearcher().doc(docs.scoreDocs[0].doc);
StorableField sf = doc.getField("blob");
FieldType fieldType = req.getSchema().getField("blob").getType();
ByteBuffer buf = (ByteBuffer) fieldType.toObject(sf);
if(buf == null){
//should never happen unless a user wrote this document directly
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "Invalid document . No field called blob");
} else {
os.write(buf.array(),0,buf.limit());
}
}
});
} else {
throw new SolrException(SolrException.ErrorCode.NOT_FOUND,
MessageFormat.format("Invalid combination of blobName {0} and version {1}", blobName,version));
}
}
} else {
String q = "*:*";
if (blobName != null) q = "blobName" + ":" + blobName;
if (version > -1) q += " AND version:" + version;
SolrParams args = new MapSolrParams((Map) ZkNodeProps.makeMap(
"q", q,
"fl", "id,size,version,timestamp,blobName",
"sort", "version desc"));
try (LocalSolrQueryRequest r = new LocalSolrQueryRequest(req.getCore(), args)){
req.getCore().getRequestHandler(null).handleRequest(r, rsp);
}
}
}
}
public static void indexMap(SolrQueryRequest req, Map<String, Object> doc) throws IOException {
SolrInputDocument solrDoc = new SolrInputDocument();
for (Map.Entry<String, Object> e : doc.entrySet()) solrDoc.addField(e.getKey(),e.getValue());
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(req.getParams().get(UpdateParams.UPDATE_CHAIN));
UpdateRequestProcessor processor = processorChain.createProcessor(req,null);
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.commitWithin =1;
cmd.solrDoc = solrDoc;
processor.processAdd(cmd);
}
@Override
public SolrRequestHandler getSubHandler(String subPath) {
if(StrUtils.splitSmart(subPath,'/').size()>4) return null;
return this;
}
//////////////////////// SolrInfoMBeans methods //////////////////////
@Override
public String getDescription() {
return "Load Jars into a system index";
}
public static final String SCHEMA = "<?xml version='1.0' ?>\n" +
"<schema name='_system collection or core' version='1.1'>\n" +
" <fieldtype name='string' class='solr.StrField' sortMissingLast='true' omitNorms='true'/>\n" +
" <fieldType name='long' class='solr.TrieLongField' precisionStep='0' positionIncrementGap='0'/>\n" +
" <fieldType name='bytes' class='solr.BinaryField'/>\n" +
" <fieldType name='date' class='solr.TrieDateField'/>\n" +
" <field name='id' type='string' indexed='true' stored='true' multiValued='false' required='true'/>\n" +
" <field name='blob' type='bytes' indexed='false' stored='true' multiValued='false' />\n" +
" <field name='size' type='long' indexed='false' stored='true' multiValued='false' />\n" +
" <field name='version' type='long' indexed='true' stored='true' multiValued='false' />\n" +
" <field name='timestamp' type='date' indexed='true' stored='true' multiValued='false' />\n" +
" <field name='blobName' type='string' indexed='true' stored='true' multiValued='false' />\n" +
" <field name='_version_' type='long' indexed='true' stored='true'/>\n" +
" <uniqueKey>id</uniqueKey>\n" +
"</schema>" ;
public static final String CONF = "<?xml version='1.0' ?>\n" +
"<config>\n" +
"<luceneMatchVersion>LATEST</luceneMatchVersion>\n" +
"<directoryFactory name='DirectoryFactory' class='${solr.directoryFactory:solr.StandardDirectoryFactory}'/>\n" +
"<updateHandler class='solr.DirectUpdateHandler2'>\n" +
" <updateLog>\n" +
" <str name='dir'>${solr.ulog.dir:}</str>\n" +
" </updateLog>\n" +
"</updateHandler>\n" +
"<requestHandler name='standard' class='solr.StandardRequestHandler' default='true' />\n" +
"<requestHandler name='/analysis/field' startup='lazy' class='solr.FieldAnalysisRequestHandler' />\n" +
"<requestHandler name='/blob' class='solr.BlobHandler'>\n" +
" <lst name='invariants'>\n"+
"<str name='maxSize'>${blob.max.size.mb:5}</str>\n"+
"</lst>\n"+
"</requestHandler>\n" +
"</config>" ;
@Override
public void init(PluginInfo info) {
super.init(info.initArgs);
if(info.initArgs != null ){
NamedList invariants = (NamedList) info.initArgs.get(PluginInfo.INVARIANTS);
if(invariants != null){
Object o = invariants.get("maxSize");
if(o != null){
maxSize = Long.parseLong(String.valueOf(o));
maxSize = maxSize*1024*1024;
}
}
}
}
}

View File

@ -860,7 +860,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
@SuppressWarnings("unchecked")
public void inform(SolrCore core) {
this.core = core;
registerFileStreamResponseWriter();
registerCloseHook();
Object nbtk = initArgs.get(NUMBER_BACKUPS_TO_KEEP_INIT_PARAM);
if(nbtk!=null) {
@ -1009,34 +1008,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
});
}
/**
* A ResponseWriter is registered automatically for wt=filestream This response writer is used to transfer index files
* in a block-by-block manner within the same HTTP response.
*/
private void registerFileStreamResponseWriter() {
core.registerResponseWriter(FILE_STREAM, new BinaryQueryResponseWriter() {
@Override
public void write(OutputStream out, SolrQueryRequest request, SolrQueryResponse resp) throws IOException {
DirectoryFileStream stream = (DirectoryFileStream) resp.getValues().get(FILE_STREAM);
stream.write(out);
}
@Override
public void write(Writer writer, SolrQueryRequest request, SolrQueryResponse response) {
throw new RuntimeException("This is a binary writer , Cannot write to a characterstream");
}
@Override
public String getContentType(SolrQueryRequest request, SolrQueryResponse response) {
return BinaryResponseParser.BINARY_CONTENT_TYPE;
}
@Override
public void init(NamedList args) { /*no op*/ }
});
}
/**
* Register a listener for postcommit/optimize
*
@ -1099,7 +1070,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
/**This class is used to read and send files in the lucene index
*
*/
private class DirectoryFileStream {
private class DirectoryFileStream implements SolrCore.RawWriter {
protected SolrParams params;
protected FastOutputStream fos;

View File

@ -21,6 +21,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.RequestHandlers;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean;
import org.apache.solr.request.SolrQueryRequest;
@ -218,6 +219,9 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo
String firstPart = handlerName.substring(0, idx);
handler = reqHandlers.get(firstPart);
if (handler == null) continue;
if(handler instanceof RequestHandlers.LazyRequestHandlerWrapper) {
handler = ((RequestHandlers.LazyRequestHandlerWrapper)handler).getWrappedHandler();
}
if (handler instanceof NestedRequestHandler) {
return ((NestedRequestHandler) handler).getSubHandler(handlerName.substring(idx));
}

View File

@ -64,6 +64,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.RE
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@ -91,6 +92,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -102,9 +104,11 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.BlobHandler;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -738,12 +742,38 @@ public class CollectionsHandler extends RequestHandlerBase {
AUTO_ADD_REPLICAS,
"router.");
if(SYSTEM_COLL.equals(name)){
//We must always create asystem collection with only a single shard
props.put(NUM_SLICES,1);
props.remove(SHARDS_PROP);
createSysConfigSet();
}
copyPropertiesIfNotNull(req.getParams(), props);
ZkNodeProps m = new ZkNodeProps(props);
handleResponse(CREATE.toLower(), m, rsp);
}
private void createSysConfigSet() throws KeeperException, InterruptedException {
SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL, null);
createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL+"/schema.xml", BlobHandler.SCHEMA.replaceAll("'","\"").getBytes(StandardCharsets.UTF_8));
createNodeIfNotExists(zk, ZkStateReader.CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/solrconfig.xml", BlobHandler.CONF.replaceAll("'", "\"").getBytes(StandardCharsets.UTF_8));
}
public static void createNodeIfNotExists(SolrZkClient zk, String path, byte[] data) throws KeeperException, InterruptedException {
if(!zk.exists(path, true)){
//create the config znode
try {
zk.create(path,data, CreateMode.PERSISTENT,true);
} catch (KeeperException.NodeExistsException e) {
//no problem . race condition. carry on the good work
}
}
}
private void handleRemoveReplica(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Remove replica: " + req.getParamString());
req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
@ -928,5 +958,6 @@ public class CollectionsHandler extends RequestHandlerBase {
public String getDescription() {
return "Manage SolrCloud Collections";
}
public static final String SYSTEM_COLL =".system";
}

View File

@ -24,6 +24,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.core.SolrCore;
import java.io.Closeable;
import java.util.Map;
import java.util.HashMap;
@ -40,7 +41,7 @@ import java.util.HashMap;
*
*
*/
public abstract class SolrQueryRequestBase implements SolrQueryRequest {
public abstract class SolrQueryRequestBase implements SolrQueryRequest, Closeable {
protected final SolrCore core;
protected final SolrParams origParams;
protected volatile IndexSchema schema;

View File

@ -681,11 +681,8 @@ public class SolrRequestParsers
if (ServletFileUpload.isMultipartContent(req)) {
return multipart.parseParamsAndFillStreams(req, streams);
}
if (req.getContentType() != null) {
return raw.parseParamsAndFillStreams(req, streams);
}
throw new SolrException(ErrorCode.UNSUPPORTED_MEDIA_TYPE, "Must specify a Content-Type header with POST requests");
}
throw new SolrException(ErrorCode.BAD_REQUEST, "Unsupported method: " + method + " for request " + req);
}
}

View File

@ -32,6 +32,8 @@ import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
@ -60,6 +62,7 @@ import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
@ -581,14 +584,14 @@ public class SimplePostTool {
URL postUrl = new URL(appendParam(solrUrl.toString(),
"literal.id="+URLEncoder.encode(u.toString(),"UTF-8") +
"&literal.url="+URLEncoder.encode(u.toString(),"UTF-8")));
boolean success = postData(new ByteArrayInputStream(result.content), null, out, result.contentType, postUrl);
boolean success = postData(new ByteArrayInputStream(result.content.array(), result.content.arrayOffset(),result.content.limit() ), null, out, result.contentType, postUrl);
if (success) {
info("POSTed web resource "+u+" (depth: "+level+")");
Thread.sleep(delay * 1000);
numPages++;
// Pull links from HTML pages only
if(recursive > level && result.contentType.equals("text/html")) {
Set<URL> children = pageFetcher.getLinksFromWebPage(u, new ByteArrayInputStream(result.content), result.contentType, postUrl);
Set<URL> children = pageFetcher.getLinksFromWebPage(u, new ByteBufferInputStream(result.content), result.contentType, postUrl);
subStack.addAll(children);
}
} else {
@ -609,23 +612,35 @@ public class SimplePostTool {
}
return numPages;
}
public static class BAOS extends ByteArrayOutputStream {
public ByteBuffer getByteBuffer() {
return ByteBuffer.wrap(super.buf,0,super.count);
}
}
public static ByteBuffer inputStreamToByteArray(InputStream is) throws IOException {
return inputStreamToByteArray(is,Integer.MAX_VALUE);
}
/**
* Reads an input stream into a byte array
*
* @param is the input stream
* @return the byte array
* @throws IOException If there is a low-level I/O error.
*/
protected byte[] inputStreamToByteArray(InputStream is) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
public static ByteBuffer inputStreamToByteArray(InputStream is, long maxSize) throws IOException {
BAOS bos = new BAOS();
long sz = 0;
int next = is.read();
while (next > -1) {
if(++sz > maxSize) throw new BufferOverflowException();
bos.write(next);
next = is.read();
}
bos.flush();
is.close();
return bos.toByteArray();
return bos.getByteBuffer();
}
/**
@ -1198,6 +1213,6 @@ public class SimplePostTool {
int httpStatus = 200;
String contentType = "text/html";
URL redirectUrl = null;
byte[] content;
ByteBuffer content;
}
}

View File

@ -0,0 +1,178 @@
package org.apache.solr.handler;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.io.Closeables;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.util.EntityUtils;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.ConfigOverlay;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.SimplePostTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.core.ConfigOverlay.getObjectByPath;
public class TestBlobHandler extends AbstractFullDistribZkTestBase {
static final Logger log = LoggerFactory.getLogger(TestSolrConfigHandlerConcurrent.class);
private void doBlobHandlerTest() throws Exception {
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
CollectionAdminResponse response1;
CollectionAdminRequest.Create createCollectionRequest = new CollectionAdminRequest.Create();
createCollectionRequest.setCollectionName(".system");
createCollectionRequest.setNumShards(1);
createCollectionRequest.setReplicationFactor(2);
response1 = createCollectionRequest.process(server);
assertEquals(0, response1.getStatus());
assertTrue(response1.isSuccess());
DocCollection sysColl = cloudClient.getZkStateReader().getClusterState().getCollection(".system");
Replica replica = sysColl.getActiveSlicesMap().values().iterator().next().getLeader();
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
String url = baseUrl + "/.system/config/requestHandler";
Map map = TestSolrConfigHandlerConcurrent.getAsMap(url, cloudClient);
assertNotNull(map);
assertEquals("solr.BlobHandler", getObjectByPath(map, true, Arrays.asList(
"solrConfig",
"requestHandler",
"/blob",
"class")));
byte[] bytarr = new byte[1024];
for (int i = 0; i < bytarr.length; i++) bytarr[i]= (byte) (i % 127);
byte[] bytarr2 = new byte[2048];
for (int i = 0; i < bytarr2.length; i++) bytarr2[i]= (byte) (i % 127);
postAndCheck(baseUrl, bytarr, 1);
postAndCheck(baseUrl, bytarr2, 2);
url = baseUrl + "/.system/blob/test/1";
map = TestSolrConfigHandlerConcurrent.getAsMap(url,cloudClient);
List l = (List) ConfigOverlay.getObjectByPath(map, false, Arrays.asList("response", "docs"));
assertNotNull(l);
map = (Map) l.get(0);
assertEquals(""+bytarr.length,String.valueOf(map.get("size")));
compareInputAndOutput(baseUrl+"/.system/blob/test?wt=filestream", bytarr2);
compareInputAndOutput(baseUrl+"/.system/blob/test/1?wt=filestream", bytarr);
}
@Override
public void tearDown() throws Exception {
super.tearDown();
System.clearProperty("numShards");
System.clearProperty("zkHost");
// insurance
DirectUpdateHandler2.commitOnClose = true;
}
private void postAndCheck(String baseUrl, byte[] bytes, int count) throws Exception {
postData(baseUrl, bytes);
String url;
Map map;
List l;
long startTime = System.nanoTime();
long maxTimeoutSeconds = 10;
while ( true) {
url = baseUrl + "/.system/blob/test";
map = TestSolrConfigHandlerConcurrent.getAsMap(url, cloudClient);
String numFound = String.valueOf(ConfigOverlay.getObjectByPath(map, false, Arrays.asList("response", "numFound")));
if(!(""+count).equals(numFound)) {
if (TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
Thread.sleep(100);
continue;
}
}
l = (List) ConfigOverlay.getObjectByPath(map, false, Arrays.asList("response", "docs"));
assertNotNull(l);
map = (Map) l.get(0);
assertEquals("" + bytes.length, String.valueOf(map.get("size")));
break;
}
}
private void compareInputAndOutput(String url, byte[] bytarr) throws IOException {
HttpClient httpClient = cloudClient.getLbServer().getHttpClient();
HttpGet httpGet = new HttpGet(url);
HttpResponse entity = httpClient.execute(httpGet);
ByteBuffer b = SimplePostTool.inputStreamToByteArray(entity.getEntity().getContent());
try {
assertEquals(b.limit(), bytarr.length);
for (int i = 0; i < bytarr.length; i++) {
assertEquals(b.get(i), bytarr[i]);
}
} finally {
httpGet.releaseConnection();
}
}
private String postData(String baseUrl, byte[] bytarr) throws IOException {
HttpPost httpPost = null;
HttpEntity entity;
String response;
try {
httpPost = new HttpPost(baseUrl+"/.system/blob/test");
httpPost.setHeader("Content-Type","application/octet-stream");
httpPost.setEntity(new ByteArrayEntity(bytarr));
entity = cloudClient.getLbServer().getHttpClient().execute(httpPost).getEntity();
return EntityUtils.toString(entity, StandardCharsets.UTF_8);
} finally {
httpPost.releaseConnection();
}
}
@Override
public void doTest() throws Exception {
doBlobHandlerTest();
}
}

View File

@ -201,6 +201,7 @@ public class TestSolrConfigHandlerConcurrent extends AbstractFullDistribZkTestBa
return (Map) ObjectBuilder.getVal(new JSONParser(new StringReader(response)));
} finally {
EntityUtils.consumeQuietly(entity);
get.releaseConnection();
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.solr.servlet;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
@ -386,15 +387,14 @@ public class SolrRequestParserTest extends SolrTestCaseJ4 {
expect(request.getMethod()).andReturn("POST").anyTimes();
expect(request.getContentType()).andReturn(null).anyTimes();
expect(request.getQueryString()).andReturn(null).anyTimes();
expect(request.getHeader(anyObject())).andReturn(null).anyTimes();
replay(request);
SolrRequestParsers parsers = new SolrRequestParsers(h.getCore().getSolrConfig());
try {
parsers.parse(h.getCore(), "/select", request);
fail("should throw SolrException");
} catch (SolrException e) {
assertTrue(e.getMessage().startsWith("Must specify a Content-Type header with POST requests"));
assertEquals(415, e.code());
fail("should not throw SolrException");
}
}
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
@ -225,7 +226,7 @@ public class SimplePostToolTest extends SolrTestCaseJ4 {
}
res.httpStatus = 200;
res.contentType = "text/html";
res.content = htmlMap.get(u.toString()).getBytes(StandardCharsets.UTF_8);
res.content = ByteBuffer.wrap( htmlMap.get(u.toString()).getBytes(StandardCharsets.UTF_8));
return res;
}