SOLR-6633

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1633390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2014-10-21 14:53:04 +00:00
parent 2e1c562b4b
commit de911858d9
6 changed files with 298 additions and 79 deletions

View File

@ -16,21 +16,34 @@ package org.apache.solr.handler.loader;
* limitations under the License.
*/
import java.io.FilterReader;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.lang.reflect.Field;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.ctc.wstx.stax.FilteredStreamReader;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.JsonRecordReader;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.util.xslt.RecordingJSONParser;
import org.noggit.CharArr;
import org.noggit.JSONParser;
import org.noggit.ObjectBuilder;
import org.apache.solr.common.SolrException;
@ -50,50 +63,49 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @since solr 4.0
*/
public class JsonLoader extends ContentStreamLoader {
final static Logger log = LoggerFactory.getLogger( JsonLoader.class );
final static Logger log = LoggerFactory.getLogger(JsonLoader.class);
private static final String CHILD_DOC_KEY = "_childDocuments_";
@Override
public String getDefaultWT() {
return "json";
}
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp,
ContentStream stream, UpdateRequestProcessor processor) throws Exception {
new SingleThreadedJsonLoader(req,rsp,processor).load(req, rsp, stream, processor);
ContentStream stream, UpdateRequestProcessor processor) throws Exception {
new SingleThreadedJsonLoader(req, rsp, processor).load(req, rsp, stream, processor);
}
static class SingleThreadedJsonLoader extends ContentStreamLoader {
protected final UpdateRequestProcessor processor;
protected final SolrQueryRequest req;
protected SolrQueryResponse rsp;
protected JSONParser parser;
protected final int commitWithin;
protected final boolean overwrite;
public SingleThreadedJsonLoader(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor processor) {
this.processor = processor;
this.req = req;
this.rsp = rsp;
commitWithin = req.getParams().getInt(UpdateParams.COMMIT_WITHIN, -1);
overwrite = req.getParams().getBool(UpdateParams.OVERWRITE, true);
overwrite = req.getParams().getBool(UpdateParams.OVERWRITE, true);
}
@Override
public void load(SolrQueryRequest req,
SolrQueryResponse rsp,
ContentStream stream,
public void load(SolrQueryRequest req,
SolrQueryResponse rsp,
ContentStream stream,
UpdateRequestProcessor processor) throws Exception {
Reader reader = null;
try {
reader = stream.getReader();
@ -102,34 +114,32 @@ public class JsonLoader extends ContentStreamLoader {
log.trace("body", body);
reader = new StringReader(body);
}
parser = new JSONParser(reader);
this.processUpdate();
}
finally {
this.processUpdate(reader);
} finally {
IOUtils.closeQuietly(reader);
}
}
@SuppressWarnings("fallthrough")
void processUpdate() throws IOException
{
void processUpdate(Reader reader) throws IOException {
String path = (String) req.getContext().get("path");
if(UpdateRequestHandler.DOC_PATH.equals(path) || "false".equals( req.getParams().get("json.command"))){
String split = req.getParams().get("split");
String[] f = req.getParams().getParams("f");
handleSplitMode(split,f);
handleSplitMode(split, f, reader);
return;
}
parser = new JSONParser(reader);
int ev = parser.nextEvent();
while( ev != JSONParser.EOF ) {
switch( ev )
{
case JSONParser.ARRAY_START:
handleAdds();
break;
case JSONParser.STRING:
if( parser.wasKey() ) {
String v = parser.getString();
@ -167,7 +177,7 @@ public class JsonLoader extends ContentStreamLoader {
break;
}
// fall through
case JSONParser.LONG:
case JSONParser.NUMBER:
case JSONParser.BIGNUMBER:
@ -175,12 +185,12 @@ public class JsonLoader extends ContentStreamLoader {
case JSONParser.NULL:
log.info( "can't have a value here! "
+JSONParser.getEventString(ev)+" "+parser.getPosition() );
case JSONParser.OBJECT_START:
case JSONParser.OBJECT_END:
case JSONParser.ARRAY_END:
break;
default:
log.info("Noggit UNKNOWN_EVENT_ID:"+ev);
break;
@ -190,27 +200,41 @@ public class JsonLoader extends ContentStreamLoader {
}
}
private void handleSplitMode(String split, String[] fields) throws IOException {
if(split == null) split = "/";
if(fields == null || fields.length ==0) fields = new String[]{"$FQN:/**"};
final boolean echo = "true".equals( req.getParams().get("echo"));
private void handleSplitMode(String split, String[] fields, final Reader reader) throws IOException {
if (split == null) split = "/";
if (fields == null || fields.length == 0) fields = new String[]{"$FQN:/**"};
final boolean echo = "true".equals(req.getParams().get("echo"));
final String srcField = req.getParams().get("srcField");
final boolean mapUniqueKeyOnly = req.getParams().getBool("mapUniqueKeyOnly",false);
if (srcField != null) {
if (!"/".equals(split))
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Raw data can be stored only if split=/");
parser = new RecordingJSONParser(reader);
} else {
parser = new JSONParser(reader);
}
JsonRecordReader jsonRecordReader = JsonRecordReader.getInst(split, Arrays.asList(fields));
jsonRecordReader.streamRecords(parser,new JsonRecordReader.Handler() {
ArrayList docs =null;
jsonRecordReader.streamRecords(parser, new JsonRecordReader.Handler() {
ArrayList docs = null;
@Override
public void handle(Map<String, Object> record, String path) {
if(echo){
if(docs ==null) {
Map<String, Object> copy = getDocMap(record, parser, srcField, mapUniqueKeyOnly);
if (echo) {
if (docs == null) {
docs = new ArrayList();
rsp.add("docs",docs);
rsp.add("docs", docs);
}
docs.add(record);
docs.add(copy);
} else {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.commitWithin = commitWithin;
cmd.overwrite = overwrite;
cmd.solrDoc = new SolrInputDocument();
for (Map.Entry<String, Object> entry : record.entrySet()) {
cmd.solrDoc = new SolrInputDocument();
for (Map.Entry<String, Object> entry : copy.entrySet()) {
cmd.solrDoc.setField(entry.getKey(),entry.getValue());
}
try {
@ -223,6 +247,37 @@ public class JsonLoader extends ContentStreamLoader {
});
}
private Map<String, Object> getDocMap(Map<String, Object> record, JSONParser parser, String srcField, boolean mapUniqueKeyOnly) {
Map result = record;
if(srcField != null && parser instanceof RecordingJSONParser){
//if srcFIeld specified extract it out first
result = new LinkedHashMap(record);
RecordingJSONParser rjp = (RecordingJSONParser) parser;
result.put(srcField, rjp.getBuf());
rjp.resetBuf();
}
if(mapUniqueKeyOnly){
SchemaField sf = req.getSchema().getUniqueKeyField();
if(sf == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No uniqueKey specified in schema");
String df = req.getParams().get(CommonParams.DF);
if(df == null)throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No 'df' specified in request");
Map copy = new LinkedHashMap();
String uniqueField = (String) record.get(sf.getName());
if(uniqueField == null) uniqueField = UUID.randomUUID().toString().toLowerCase(Locale.ROOT);
copy.put(sf.getName(),uniqueField);
if(srcField != null && result.containsKey(srcField)){
copy.put(srcField, result.remove(srcField));
}
copy.put(df, result.values());
result = copy;
}
return result;
}
/*private void handleStreamingSingleDocs() throws IOException
{
while( true ) {
@ -352,18 +407,18 @@ public class JsonLoader extends ContentStreamLoader {
RollbackUpdateCommand parseRollback() throws IOException {
assertNextEvent( JSONParser.OBJECT_START );
assertNextEvent( JSONParser.OBJECT_END );
return new RollbackUpdateCommand(req);
}
void parseCommitOptions(CommitUpdateCommand cmd ) throws IOException
{
assertNextEvent( JSONParser.OBJECT_START );
final Map<String,Object> map = (Map)ObjectBuilder.getVal(parser);
// SolrParams currently expects string values...
SolrParams p = new SolrParams() {
@Override
@ -371,31 +426,31 @@ public class JsonLoader extends ContentStreamLoader {
Object o = map.get(param);
return o == null ? null : o.toString();
}
@Override
public String[] getParams(String param) {
return new String[]{get(param)};
}
@Override
public Iterator<String> getParameterNamesIterator() {
return map.keySet().iterator();
}
};
RequestHandlerUtils.validateCommitParams(p);
p = SolrParams.wrapDefaults(p, req.getParams()); // default to the normal request params for commit options
RequestHandlerUtils.updateCommit(cmd, p);
}
AddUpdateCommand parseAdd() throws IOException
{
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.commitWithin = commitWithin;
cmd.overwrite = overwrite;
float boost = 1.0f;
while( true ) {
int ev = parser.nextEvent();
if( ev == JSONParser.STRING ) {
@ -431,7 +486,7 @@ public class JsonLoader extends ContentStreamLoader {
if( cmd.solrDoc == null ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"missing solr document. "+parser.getPosition() );
}
cmd.solrDoc.setDocumentBoost( boost );
cmd.solrDoc.setDocumentBoost( boost );
return cmd;
}
else {
@ -441,8 +496,8 @@ public class JsonLoader extends ContentStreamLoader {
}
}
}
void handleAdds() throws IOException
{
while( true ) {
@ -458,15 +513,15 @@ public class JsonLoader extends ContentStreamLoader {
processor.processAdd(cmd);
}
}
int assertNextEvent(int expected ) throws IOException
{
int got = parser.nextEvent();
assertEvent(got, expected);
return got;
}
void assertEvent(int ev, int expected) {
if( ev != expected ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
@ -518,14 +573,14 @@ public class JsonLoader extends ContentStreamLoader {
sif.setValue(val, 1.0f);
}
}
private void parseExtendedFieldValue(SolrInputField sif, int ev) throws IOException {
assert ev == JSONParser.OBJECT_START;
float boost = 1.0f;
Object normalFieldValue = null;
Map<String, Object> extendedInfo = null;
for (;;) {
ev = parser.nextEvent();
switch (ev) {
@ -538,7 +593,7 @@ public class JsonLoader extends ContentStreamLoader {
ev != JSONParser.BIGNUMBER ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "boost should have number! "+JSONParser.getEventString(ev) );
}
boost = (float)parser.getDouble();
} else if ("value".equals(label)) {
normalFieldValue = parseNormalFieldValue(parser.nextEvent());
@ -553,7 +608,7 @@ public class JsonLoader extends ContentStreamLoader {
extendedInfo.put(label, val);
}
break;
case JSONParser.OBJECT_END:
if (extendedInfo != null) {
if (normalFieldValue != null) {
@ -564,14 +619,14 @@ public class JsonLoader extends ContentStreamLoader {
sif.setValue(normalFieldValue, boost);
}
return;
default:
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error parsing JSON extended field value. Unexpected "+JSONParser.getEventString(ev) );
}
}
}
private Object parseNormalFieldValue(int ev) throws IOException {
if (ev == JSONParser.ARRAY_START) {
List<Object> val = parseArrayFieldValue(ev);
@ -581,8 +636,8 @@ public class JsonLoader extends ContentStreamLoader {
return val;
}
}
private Object parseSingleFieldValue(int ev) throws IOException {
switch (ev) {
case JSONParser.STRING:
@ -604,11 +659,11 @@ public class JsonLoader extends ContentStreamLoader {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error parsing JSON field value. Unexpected "+JSONParser.getEventString(ev) );
}
}
private List<Object> parseArrayFieldValue(int ev) throws IOException {
assert ev == JSONParser.ARRAY_START;
ArrayList lst = new ArrayList(2);
for (;;) {
ev = parser.nextEvent();

View File

@ -0,0 +1,79 @@
package org.apache.solr.util.xslt;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.Reader;
import org.noggit.CharArr;
import org.noggit.JSONParser;
public class RecordingJSONParser extends JSONParser{
public RecordingJSONParser(Reader in) { super(in); }
private StringBuilder sb = new StringBuilder() ;
private long position;
private boolean objectStarted =false;
@Override
protected int getChar() throws IOException {
int aChar = super.getChar();
if(aChar == '{') objectStarted =true;
if(getPosition() >position) recordChar((char) aChar); // check before adding if a pushback happened ignore
position= getPosition();
return aChar;
}
private void recordChar(int aChar) {
if(objectStarted)
sb.append((char) aChar);
}
private void recordStr(String s) {
if(objectStarted) sb.append(s);
}
@Override
public CharArr getStringChars() throws IOException {
CharArr chars = super.getStringChars();
recordStr(chars.toString());
position = getPosition();
// if reading a String , the getStringChars do not return the closing single quote or double quote
//so, try to capture that
if(chars.getArray().length >=chars.getStart()+chars.size()) {
char next = chars.getArray()[chars.getStart() + chars.size()];
if(next =='"' || next == '\'') {
recordChar(next);
}
}
return chars;
}
public void resetBuf(){
sb = new StringBuilder();
objectStarted=false;
}
public String getBuf() {
if(sb != null) return sb.toString();
return null;
}
}

View File

@ -31,6 +31,7 @@ import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.processor.BufferingRequestProcessor;
import org.junit.BeforeClass;
import org.junit.Test;
import org.noggit.ObjectBuilder;
import org.xml.sax.SAXException;
import java.math.BigDecimal;
@ -276,7 +277,8 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
" \"f1\": \"v2\",\n" +
" \"f2\": null\n" +
" }\n";
SolrQueryRequest req = req("json.command","false");
SolrQueryRequest req = req("srcField","_src");
req.getContext().put("path","/update/json/docs");
SolrQueryResponse rsp = new SolrQueryResponse();
BufferingRequestProcessor p = new BufferingRequestProcessor(null);
JsonLoader loader = new JsonLoader();
@ -284,25 +286,26 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
assertEquals( 2, p.addCommands.size() );
doc = "\n" +
"\n" +
"{bool: true,\n" +
" f0: \"v0\",\n" +
" f2: {\n" +
"{\"bool\": true,\n" +
" \"f0\": \"v0\",\n" +
" \"f2\": {\n" +
" \t \"boost\": 2.3,\n" +
" \t \"value\": \"test\"\n" +
" \t },\n" +
"array: [ \"aaa\", \"bbb\" ],\n" +
"boosted: {\n" +
"\"array\": [ \"aaa\", \"bbb\" ],\n" +
"\"boosted\": {\n" +
" \t \"boost\": 6.7,\n" +
" \t \"value\": [ \"aaa\", \"bbb\" ]\n" +
" \t }\n" +
" }\n" +
"\n" +
"\n" +
" {f1: \"v1\",\n" +
" f1: \"v2\",\n" +
" f2: null\n" +
" {\"f1\": \"v1\",\n" +
" \"f2\": \"v2\",\n" +
" \"f3\": null\n" +
" }\n";
req = req("json.command","false");
req = req("srcField","_src");
req.getContext().put("path","/update/json/docs");
rsp = new SolrQueryResponse();
p = new BufferingRequestProcessor(null);
loader = new JsonLoader();
@ -310,15 +313,40 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
assertEquals( 2, p.addCommands.size() );
String content = (String) p.addCommands.get(0).solrDoc.getFieldValue("_src");
assertNotNull(content);
Map obj = (Map) ObjectBuilder.fromJSON(content);
assertEquals(Boolean.TRUE, obj.get("bool"));
assertEquals("v0", obj.get("f0"));
assertNotNull(obj.get("f0"));
assertNotNull(obj.get("array"));
assertNotNull(obj.get("boosted"));
content = (String) p.addCommands.get(1).solrDoc.getFieldValue("_src");
assertNotNull(content);
obj = (Map) ObjectBuilder.fromJSON(content);
assertEquals("v1", obj.get("f1"));
assertEquals("v2", obj.get("f2"));
assertTrue(obj.containsKey("f3"));
doc = "[{'id':'1'},{'id':'2'}]".replace('\'', '"');
req = req("json.command","false");
req = req("srcField","_src");
req.getContext().put("path","/update/json/docs");
rsp = new SolrQueryResponse();
p = new BufferingRequestProcessor(null);
loader = new JsonLoader();
loader.load(req, rsp, new ContentStreamBase.StringStream(doc), p);
assertEquals( 2, p.addCommands.size() );
content = (String) p.addCommands.get(0).solrDoc.getFieldValue("_src");
assertNotNull(content);
obj = (Map) ObjectBuilder.fromJSON(content);
assertEquals("1", obj.get("id"));
content = (String) p.addCommands.get(1).solrDoc.getFieldValue("_src");
assertNotNull(content);
obj = (Map) ObjectBuilder.fromJSON(content);
assertEquals("2", obj.get("id"));
}

View File

@ -156,6 +156,7 @@
<field name="content_type" type="string" indexed="true" stored="true" multiValued="true"/>
<field name="last_modified" type="date" indexed="true" stored="true"/>
<field name="links" type="string" indexed="true" stored="true" multiValued="true"/>
<field name="_src" type="string" indexed="false" stored="true"/>
<!-- Main body of document extracted by SolrCell.
NOTE: This field is not indexed by default, since it is also copied to "text"

View File

@ -1057,6 +1057,19 @@
<str name="df">text</str>
</lst>
</initParams>
<initParams path="/update/json/docs">
<lst name="defaults">
<!--this ensures that the entire json doc will be stored verbatim into one field-->
<str name="srcField">_src</str>
<!--This means a the uniqueKeyField will be extracted from the fields and
all fields go into the 'df' field. In this config df is already configured to be 'text'
-->
<str name="mapUniqueKeyOnly">true</str>
</lst>
</initParams>
<!-- The following are implicitly added
<requestHandler name="/update/json" class="solr.UpdateRequestHandler">
<lst name="defaults">

View File

@ -17,13 +17,24 @@
package org.apache.solr.client.solrj.embedded;
import java.io.ByteArrayInputStream;
import java.util.Map;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.InputStreamEntity;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrExampleTests;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.util.ExternalPaths;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.noggit.ObjectBuilder;
/**
* TODO? perhaps use:
@ -52,4 +63,36 @@ public class SolrExampleJettyTest extends SolrExampleTests {
// expected
}
}
@Test
public void testArbitraryJsonIndexing() throws Exception {
HttpSolrServer server = (HttpSolrServer) getSolrServer();
server.deleteByQuery("*:*");
server.commit();
assertNumFound("*:*", 0); // make sure it got in
// two docs, one with uniqueKey, another without it
String json = "{\"id\":\"abc1\", \"name\": \"name1\"} {\"name\" : \"name2\"}";
HttpClient httpClient = server.getHttpClient();
HttpPost post = new HttpPost(server.getBaseURL() + "/update/json/docs");
post.setHeader("Content-Type", "application/json");
post.setEntity(new InputStreamEntity(new ByteArrayInputStream(json.getBytes("UTF-8")), -1));
HttpResponse response = httpClient.execute(post);
assertEquals(200, response.getStatusLine().getStatusCode());
server.commit();
QueryResponse rsp = getSolrServer().query(new SolrQuery("*:*"));
assertEquals(2,rsp.getResults().getNumFound());
SolrDocument doc = rsp.getResults().get(0);
String src = (String) doc.getFieldValue("_src");
Map m = (Map) ObjectBuilder.fromJSON(src);
assertEquals("abc1",m.get("id"));
assertEquals("name1",m.get("name"));
doc = rsp.getResults().get(1);
src = (String) doc.getFieldValue("_src");
m = (Map) ObjectBuilder.fromJSON(src);
assertEquals("name2",m.get("name"));
}
}