SOLR-13731: javabin must support a 1:1 mapping of the JSON update format

This commit is contained in:
Noble Paul 2019-10-14 11:07:38 +11:00 committed by noble
parent 845c3e9775
commit 41779e5f60
8 changed files with 224 additions and 62 deletions

View File

@ -38,7 +38,9 @@ New Features
Improvements
---------------------
(No changes)
*SOLR-13731: javabin must support a 1:1 mapping of the JSON update format (noble)
Optimizations
---------------------

View File

@ -40,6 +40,7 @@ import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.CHILDDOC;
import static org.apache.solr.common.util.ByteArrayUtf8CharSequence.convertCharSeq;
/**
@ -81,7 +82,7 @@ public class JavaBinUpdateRequestCodec {
if(updateRequest.getDocIterator() != null){
docIter = updateRequest.getDocIterator();
}
Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
nl.add("params", params);// 0: params
@ -125,7 +126,7 @@ public class JavaBinUpdateRequestCodec {
try (JavaBinCodec codec = new StreamingCodec(namedList, updateRequest, handler)) {
codec.unmarshal(is);
}
// NOTE: if the update request contains only delete commands the params
// must be loaded now
if(updateRequest.getParams()==null) {
@ -145,11 +146,11 @@ public class JavaBinUpdateRequestCodec {
} else {
docMap = (List<Entry<SolrInputDocument, Map<Object, Object>>>) docsMapObj;
}
// we don't add any docs, because they were already processed
// deletes are handled later, and must be passed back on the UpdateRequest
if (delById != null) {
for (String s : delById) {
updateRequest.deleteById(s);
@ -167,7 +168,7 @@ public class JavaBinUpdateRequestCodec {
} else {
updateRequest.deleteById(entry.getKey());
}
}
}
if (delByQ != null) {
@ -175,7 +176,7 @@ public class JavaBinUpdateRequestCodec {
updateRequest.deleteByQuery(s);
}
}
return updateRequest;
}
@ -282,7 +283,9 @@ public class JavaBinUpdateRequestCodec {
private List readOuterMostDocIterator(DataInputInputStream fis) throws IOException {
if(namedList[0] == null) namedList[0] = new NamedList();
NamedList params = (NamedList) namedList[0].get("params");
if (params == null) params = new NamedList();
updateRequest.setParams(new ModifiableSolrParams(params.toSolrParams()));
if (handler == null) return super.readIterator(fis);
Integer commitWithin = null;
@ -313,8 +316,10 @@ public class JavaBinUpdateRequestCodec {
commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN);
overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
}
} else {
} else if (o instanceof SolrInputDocument) {
sdoc = (SolrInputDocument) o;
} else if (o instanceof Map) {
sdoc = convertMapToSolrInputDoc((Map) o);
}
// peek at the next object to see if we're at the end
@ -333,5 +338,26 @@ public class JavaBinUpdateRequestCodec {
}
}
private SolrInputDocument convertMapToSolrInputDoc(Map m) {
SolrInputDocument result = createSolrInputDocument(m.size());
m.forEach((k, v) -> {
if (CHILDDOC.equals(k.toString())) {
if (v instanceof List) {
List list = (List) v;
for (Object o : list) {
if (o instanceof Map) {
result.addChildDocument(convertMapToSolrInputDoc((Map) o));
}
}
} else if (v instanceof Map) {
result.addChildDocument(convertMapToSolrInputDoc((Map) v));
}
} else {
result.addField(k.toString(), v);
}
});
return result;
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common;
import java.io.IOException;
import java.util.Set;
import java.util.function.Predicate;
public class ConditionalKeyMapWriter implements MapWriter {
private final MapWriter delegate;
private final Predicate<CharSequence> predicate;
public ConditionalKeyMapWriter(MapWriter delegate, Predicate<CharSequence> predicate) {
this.delegate = delegate;
this.predicate = predicate;
}
public static class EntryWriterWrapper implements EntryWriter {
private final EntryWriter delegate;
private final Predicate<CharSequence> predicate;
public EntryWriterWrapper(EntryWriter delegate, Predicate<CharSequence> predicate) {
this.delegate = delegate;
this.predicate = predicate;
}
@Override
public EntryWriter put(CharSequence k, Object v) throws IOException {
if (predicate.test(k)) delegate.put(k, v);
return this;
}
@Override
public EntryWriter put(CharSequence k, int v) throws IOException {
if (predicate.test(k)) delegate.put(k, v);
return this;
}
@Override
public EntryWriter put(CharSequence k, long v) throws IOException {
if (predicate.test(k)) delegate.put(k, v);
return this;
}
@Override
public EntryWriter put(CharSequence k, float v) throws IOException {
if (predicate.test(k)) delegate.put(k, v);
return this;
}
@Override
public EntryWriter put(CharSequence k, double v) throws IOException {
if (predicate.test(k)) delegate.put(k, v);
return this;
}
@Override
public EntryWriter put(CharSequence k, boolean v) throws IOException {
if (predicate.test(k)) delegate.put(k, v);
return this;
}
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
if (delegate != null) delegate.writeMap(new EntryWriterWrapper(ew, predicate));
}
public static Predicate<CharSequence> dedupeKeyPredicate(Set<CharSequence> keys) {
return (k) -> keys.add(k);
}
}

View File

@ -26,6 +26,8 @@ import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.solr.common.params.CommonParams;
/**
* Represent the field-value information needed to construct and index
* a Lucene Document. Like the SolrDocument, the field values should
@ -57,6 +59,9 @@ public class SolrInputDocument extends SolrDocumentBase<SolrInputField, SolrInpu
bc.accept(k, o);
};
_fields.forEach(wrapper);
if (_childDocuments != null) {
ew.put(CommonParams.CHILDDOC, _childDocuments);
}
}
public SolrInputDocument(Map<String,SolrInputField> fields) {

View File

@ -234,7 +234,7 @@ public interface CommonParams {
}
return null;
}
};
}
/** which parameters to log (if not supplied all parameters will be logged) **/
String LOG_PARAMS_LIST = "logParamsList";
@ -299,5 +299,7 @@ public interface CommonParams {
String FILE = "file";
String FILES = "files";
String CHILDDOC = "_childDocuments_";
}

View File

@ -37,8 +37,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.solr.common.ConditionalMapWriter;
import org.apache.solr.common.ConditionalKeyMapWriter;
import org.apache.solr.common.EnumFieldValue;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.IteratorWriter.ItemWriter;
@ -49,6 +50,7 @@ import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.CommonParams;
import org.noggit.CharArr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -531,7 +533,7 @@ public class JavaBinCodec implements PushWriter {
//use this to ignore the writable interface because , child docs will ignore the fl flag
// is it a good design?
private boolean ignoreWritable =false;
private ConditionalMapWriter.EntryWriterWrapper cew;
private MapWriter.EntryWriter cew;
public void writeSolrDocument(SolrDocument doc) throws IOException {
List<SolrDocument> children = doc.getChildDocuments();
@ -546,7 +548,7 @@ public class JavaBinCodec implements PushWriter {
int sz = fieldsCount + (children==null ? 0 : children.size());
writeTag(SOLRDOC);
writeTag(ORDERED_MAP, sz);
if (cew == null) cew = new ConditionalMapWriter.EntryWriterWrapper(ew, (k, o) -> toWrite(k.toString()));
if (cew == null) cew = new ConditionalKeyMapWriter.EntryWriterWrapper(ew, (k) -> toWrite(k.toString()));
doc.writeMap(cew);
if (children != null) {
try {
@ -649,13 +651,14 @@ public class JavaBinCodec implements PushWriter {
protected SolrInputDocument createSolrInputDocument(int sz) {
return new SolrInputDocument(new LinkedHashMap<>(sz));
}
static final Predicate<CharSequence> IGNORECHILDDOCS = it -> !CommonParams.CHILDDOC.equals(it.toString());
public void writeSolrInputDocument(SolrInputDocument sdoc) throws IOException {
List<SolrInputDocument> children = sdoc.getChildDocuments();
int sz = sdoc.size() + (children==null ? 0 : children.size());
writeTag(SOLRINPUTDOC, sz);
writeFloat(1f); // document boost
sdoc.writeMap(ew);
sdoc.writeMap(new ConditionalKeyMapWriter.EntryWriterWrapper(ew,IGNORECHILDDOCS));
if (children != null) {
for (SolrInputDocument child : children) {
writeSolrInputDocument(child);

View File

@ -417,6 +417,9 @@ public class NamedList<T> implements Cloneable, Serializable, Iterable<Map.Entry
}
public Map<String,T> asShallowMap() {
return asShallowMap(false);
}
public Map<String,T> asShallowMap(boolean allowDps) {
return new Map<String, T>() {
@Override
public int size() {
@ -444,13 +447,17 @@ public class NamedList<T> implements Cloneable, Serializable, Iterable<Map.Entry
@Override
public T put(String key, T value) {
if (allowDps) {
NamedList.this.add(key, value);
return null;
}
int idx = NamedList.this.indexOf(key, 0);
if (idx == -1) {
NamedList.this.add(key, value);
} else {
NamedList.this.setVal(idx, value);
}
return null;
return null;
}
@Override

View File

@ -18,28 +18,33 @@ package org.apache.solr.client.solrj.request;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import junit.framework.Assert;
import org.apache.solr.SolrTestCase;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.Utils;
import org.junit.Test;
import static org.apache.solr.common.params.CommonParams.CHILDDOC;
/**
* Test for UpdateRequestCodec
*
* @since solr 1.4
*
* @see org.apache.solr.client.solrj.request.UpdateRequest
* @since solr 1.4
*/
public class TestUpdateRequestCodec extends SolrTestCase {
@ -74,7 +79,7 @@ public class TestUpdateRequestCodec extends SolrTestCase {
Collection<String> foobar = new HashSet<>();
foobar.add("baz1");
foobar.add("baz2");
doc.addField("foobar",foobar);
doc.addField("foobar", foobar);
updateRequest.add(doc);
// updateRequest.setWaitFlush(true);
@ -84,15 +89,12 @@ public class TestUpdateRequestCodec extends SolrTestCase {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
codec.marshal(updateRequest, baos);
final List<SolrInputDocument> docs = new ArrayList<>();
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
@Override
public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean overwrite) {
Assert.assertNotNull(req.getParams());
docs.add(document);
}
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = (document, req, commitWithin, overwrite) -> {
Assert.assertNotNull(req.getParams());
docs.add(document);
};
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()), handler);
for (SolrInputDocument document : docs) {
updateUnmarshalled.add(document);
@ -100,12 +102,12 @@ public class TestUpdateRequestCodec extends SolrTestCase {
for (int i = 0; i < updateRequest.getDocuments().size(); i++) {
SolrInputDocument inDoc = updateRequest.getDocuments().get(i);
SolrInputDocument outDoc = updateUnmarshalled.getDocuments().get(i);
compareDocs("doc#"+i, inDoc, outDoc);
compareDocs("doc#" + i, inDoc, outDoc);
}
Assert.assertEquals(updateUnmarshalled.getDeleteById().get(0) ,
updateRequest.getDeleteById().get(0));
Assert.assertEquals(updateUnmarshalled.getDeleteQuery().get(0) ,
updateRequest.getDeleteQuery().get(0));
Assert.assertEquals(updateUnmarshalled.getDeleteById().get(0),
updateRequest.getDeleteById().get(0));
Assert.assertEquals(updateUnmarshalled.getDeleteQuery().get(0),
updateRequest.getDeleteQuery().get(0));
assertEquals("b", updateUnmarshalled.getParams().get("a"));
}
@ -125,10 +127,7 @@ public class TestUpdateRequestCodec extends SolrTestCase {
doc.addField("desc", "one");
// imagine someone adding a custom Bean that implements Iterable
// but is not a Collection
doc.addField("iter", new Iterable<String>() {
@Override
public Iterator<String> iterator() { return values.iterator(); }
});
doc.addField("iter", (Iterable<String>) values::iterator);
doc.addField("desc", "1");
updateRequest.add(doc);
@ -136,16 +135,13 @@ public class TestUpdateRequestCodec extends SolrTestCase {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
codec.marshal(updateRequest, baos);
final List<SolrInputDocument> docs = new ArrayList<>();
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
@Override
public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean overwrite) {
Assert.assertNotNull(req.getParams());
docs.add(document);
}
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = (document, req, commitWithin, overwrite) -> {
Assert.assertNotNull(req.getParams());
docs.add(document);
};
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()), handler);
for (SolrInputDocument document : docs) {
updateUnmarshalled.add(document);
}
@ -154,12 +150,50 @@ public class TestUpdateRequestCodec extends SolrTestCase {
SolrInputField iter = outDoc.getField("iter");
Assert.assertNotNull("iter field is null", iter);
Object iterVal = iter.getValue();
Assert.assertTrue("iterVal is not a Collection",
iterVal instanceof Collection);
Assert.assertTrue("iterVal is not a Collection",
iterVal instanceof Collection);
Assert.assertEquals("iterVal contents", values, iterVal);
}
//this format accepts a 1:1 mapping of the json format and javabin format
public void testStreamableInputDocFormat() throws IOException {
Map m = Utils.makeMap("id","1","desc" ,"The desc 1");
m.put(CHILDDOC, (MapWriter) ew -> {
ew.put("id","1.1");
ew.put("desc" ,"The desc 1.1");
ew.put(CHILDDOC, (IteratorWriter) iw -> {
iw.add(Utils.makeMap("id", "1.1.1","desc","The desc 1.1.1"));
iw.add((MapWriter) ew1 -> {
ew1.put("id", "1.1.2");
ew1.put("desc", "The desc 1.1.2");
});
});
});
MapWriter m2 = ew -> {
ew.put("id", "2");
ew.put("des", "The desc 2");
};
List l = new ArrayList();
l.add(m);
l.add(m2);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new JavaBinCodec().marshal(l.iterator(), baos);
List<SolrInputDocument> l2 = new ArrayList();
new JavaBinUpdateRequestCodec().unmarshal(new ByteArrayInputStream(baos.toByteArray()), (document, req, commitWithin, override) -> l2.add(document));
assertEquals(l2.get(0).getChildDocuments().size(), 1);
Object o = Utils.fromJSONString(Utils.writeJson(l.get(0), new StringWriter(), true).toString());
Object cdoc = Utils.getObjectByPath(o, false, CHILDDOC);
assertEquals(Utils.writeJson(cdoc, new StringWriter(), true).toString(),
Utils.writeJson(l2.get(0).getChildDocuments().get(0) ,new StringWriter(), true).toString());
}
@Test
// commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
@ -193,24 +227,20 @@ public class TestUpdateRequestCodec extends SolrTestCase {
Collection<String> foobar = new HashSet<>();
foobar.add("baz1");
foobar.add("baz2");
doc.addField("foobar",foobar);
doc.addField("foobar", foobar);
updateRequest.add(doc);
updateRequest.deleteById("2");
updateRequest.deleteByQuery("id:3");
InputStream is = getClass().getResourceAsStream("/solrj/updateReq_4_5.bin");
assertNotNull("updateReq_4_5.bin was not found", is);
UpdateRequest updateUnmarshalled = new JavaBinUpdateRequestCodec().unmarshal(is, new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
@Override
public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean override) {
if(commitWithin == null ){
req.add(document);
}
System.err.println("Doc" + document + " ,commitWithin:"+commitWithin+ " , override:"+ override);
UpdateRequest updateUnmarshalled = new JavaBinUpdateRequestCodec().unmarshal(is, (document, req, commitWithin, override) -> {
if (commitWithin == null) {
req.add(document);
}
System.err.println("Doc" + document + " ,commitWithin:" + commitWithin + " , override:" + override);
});
System.err.println(updateUnmarshalled.getDocumentsMap());
@ -219,11 +249,11 @@ public class TestUpdateRequestCodec extends SolrTestCase {
for (int i = 0; i < updateRequest.getDocuments().size(); i++) {
SolrInputDocument inDoc = updateRequest.getDocuments().get(i);
SolrInputDocument outDoc = updateUnmarshalled.getDocuments().get(i);
compareDocs("doc#"+i, inDoc, outDoc);
compareDocs("doc#" + i, inDoc, outDoc);
}
Assert.assertEquals(updateUnmarshalled.getDeleteById().get(0) ,
Assert.assertEquals(updateUnmarshalled.getDeleteById().get(0),
updateRequest.getDeleteById().get(0));
Assert.assertEquals(updateUnmarshalled.getDeleteQuery().get(0) ,
Assert.assertEquals(updateUnmarshalled.getDeleteQuery().get(0),
updateRequest.getDeleteQuery().get(0));
assertEquals("b", updateUnmarshalled.getParams().get("a"));
@ -231,9 +261,8 @@ public class TestUpdateRequestCodec extends SolrTestCase {
}
private void compareDocs(String m,
SolrInputDocument expectedDoc,
private void compareDocs(String m,
SolrInputDocument expectedDoc,
SolrInputDocument actualDoc) {
for (String s : expectedDoc.getFieldNames()) {
@ -252,7 +281,7 @@ public class TestUpdateRequestCodec extends SolrTestCase {
}
Assert.assertEquals(m + " diff values for field: " + s,
expectedVal, actualVal);
expectedVal, actualVal);
}
}