diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 673b5a7c63d..4d9e6a0c547 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -135,6 +135,8 @@ New Features * SOLR-9324: Support Secure Impersonation / Proxy User for solr authentication (Gregory Chanan, Hrishikesh Gadre via yonik) +* SOLR-9721: javabin Tuple parser for streaming and other end points (noble) + Optimizations ---------------------- * SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have diff --git a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java index 52010cea2e6..e432f9473e5 100644 --- a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java +++ b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java @@ -51,10 +51,13 @@ import org.apache.solr.common.MapWriter.EntryWriter; import org.apache.solr.common.PushWriter; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.JavaBinCodec; import org.apache.solr.core.SolrCore; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestInfo; +import org.apache.solr.response.BinaryResponseWriter; import org.apache.solr.response.JSONResponseWriter; +import org.apache.solr.response.QueryResponseWriter; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.schema.BoolField; import org.apache.solr.schema.FieldType; @@ -125,8 +128,14 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { } public void write(OutputStream os) throws IOException { - respWriter = new OutputStreamWriter(os, StandardCharsets.UTF_8); - writer = JSONResponseWriter.getPushWriter(respWriter, req, res); + QueryResponseWriter rw = req.getCore().getResponseWriters().get(wt); + if (rw instanceof BinaryResponseWriter) { + //todo add support for other writers after testing + writer = new JavaBinCodec(os, null); + } else { + respWriter = new OutputStreamWriter(os, StandardCharsets.UTF_8); + writer = JSONResponseWriter.getPushWriter(respWriter, req, res); + } Exception exception = res.getException(); if (exception != null) { if (!(exception instanceof IgnoreException)) { diff --git a/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java b/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java index 77c263a0e6b..871f8c42b62 100644 --- a/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java +++ b/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java @@ -147,13 +147,8 @@ public class SmileWriterTest extends SolrTestCaseJ4 { @Test public void test10Docs() throws IOException { - SolrDocumentList l = new SolrDocumentList(); - for(int i=0;i<10; i++){ - l.add(sampleDoc(random(), i)); - } - SolrQueryResponse response = new SolrQueryResponse(); - response.getValues().add("results", l); + SolrDocumentList l = constructSolrDocList(response); ByteArrayOutputStream baos = new ByteArrayOutputStream(); new SmileResponseWriter().write(baos, new LocalSolrQueryRequest(null, new ModifiableSolrParams()), response); @@ -171,6 +166,16 @@ public class SmileWriterTest extends SolrTestCaseJ4 { } + public static SolrDocumentList constructSolrDocList(SolrQueryResponse response) { + SolrDocumentList l = new SolrDocumentList(); + for(int i=0;i<10; i++){ + l.add(sampleDoc(random(), i)); + } + + response.getValues().add("results", l); + return l; + } + public static SolrDocument sampleDoc(Random r, int bufnum) { SolrDocument sdoc = new SolrDocument(); sdoc.put("id", "my_id_" + bufnum); diff --git a/solr/core/src/test/org/apache/solr/response/TestJavabinTupleStreamParser.java b/solr/core/src/test/org/apache/solr/response/TestJavabinTupleStreamParser.java new file mode 100644 index 00000000000..d71044603ea --- /dev/null +++ b/solr/core/src/test/org/apache/solr/response/TestJavabinTupleStreamParser.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.response; + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.JavabinTupleStreamParser; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.util.JavaBinCodec; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.Utils; + +import static org.apache.solr.response.SmileWriterTest.constructSolrDocList; + +public class TestJavabinTupleStreamParser extends SolrTestCaseJ4 { + + public void testKnown() throws IOException { + String payload = "{\n" + + " \"responseHeader\":{\n" + + " \"zkConnected\":true,\n" + + " \"status\":0,\n" + + " \"QTime\":46},\n" + + " \"response\":{\n" + + " \"numFound\":2,\n" + + " \"start\":0,\n" + + " \"docs\":[\n" + + " {\n" + + " \"id\":\"2\",\n" + + " \"a_s\":\"hello2\",\n" + + " \"a_i\":2,\n" + + " \"a_f\":0.0},\n" + + " {\n" + + " \"id\":\"3\",\n" + + " \"a_s\":\"hello3\",\n" + + " \"a_i\":3,\n" + + " \"a_f\":3.0}]}}"; + SimpleOrderedMap nl = convert2OrderedMap((Map) Utils.fromJSONString(payload)); + + byte[] bytes = serialize(nl); + + JavabinTupleStreamParser parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), true); + Map map = parser.next(); + assertEquals("2", map.get("id")); + map = parser.next(); + assertEquals("3", map.get("id")); + System.out.println(); + map = parser.next(); + assertNull(map); + + } + + public SimpleOrderedMap convert2OrderedMap(Map m) { + SimpleOrderedMap result = new SimpleOrderedMap<>(); + m.forEach((k, v) -> { + if (v instanceof List) v = ((List) v).iterator(); + if (v instanceof Map) v = convert2OrderedMap((Map) v); + result.add((String) k, v); + }); + return result; + + } + + public void testSimple() throws IOException { + List> l = new ArrayList(); + l.add(Utils.makeMap("id", 1, "f", 1.0f, "s", "Some str 1")); + l.add(Utils.makeMap("id", 2, "f", 2.0f, "s", "Some str 2")); + l.add(Utils.makeMap("id", 3, "f", 1.0f, "s", "Some str 3")); + l.add(Utils.makeMap("EOF", true, "RESPONSE_TIME", 206, "sleepMillis", 1000)); + Iterator> iterator = l.iterator(); + TupleStream tupleStream = new TupleStream() { + @Override + public void setStreamContext(StreamContext context) { + + } + + @Override + public List children() { + return null; + } + + @Override + public void open() throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public Tuple read() throws IOException { + if (iterator.hasNext()) return new Tuple(iterator.next()); + else return null; + } + + @Override + public StreamComparator getStreamSort() { + return null; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("Dummy") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE) + .withExpression("--non-expressible--"); + } + }; + + byte[] bytes = serialize(tupleStream); + JavabinTupleStreamParser parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), true); + Map m = parser.next(); + assertEquals(1L, m.get("id")); + assertEquals(1.0, (Double) m.get("f"), 0.01); + m = parser.next(); + assertEquals(2L, m.get("id")); + assertEquals(2.0, (Double) m.get("f"), 0.01); + m = parser.next(); + assertEquals(3L, m.get("id")); + assertEquals(1.0, (Double) m.get("f"), 0.01); + m = parser.next(); + assertEquals(Boolean.TRUE, m.get("EOF")); + + parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), false); + m = parser.next(); + assertEquals(1, m.get("id")); + assertEquals(1.0, (Float) m.get("f"), 0.01); + m = parser.next(); + assertEquals(2, m.get("id")); + assertEquals(2.0, (Float) m.get("f"), 0.01); + m = parser.next(); + assertEquals(3, m.get("id")); + assertEquals(1.0, (Float) m.get("f"), 0.01); + m = parser.next(); + assertEquals(Boolean.TRUE, m.get("EOF")); + } + + public void testSolrDocumentList() throws IOException { + SolrQueryResponse response = new SolrQueryResponse(); + SolrDocumentList l = constructSolrDocList(response); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + new JavaBinCodec().marshal(response.getValues(), baos); + byte[] bytes = serialize(response.getValues()); + Object o = new JavaBinCodec().unmarshal(new ByteArrayInputStream(bytes)); + List list = new ArrayList<>(); + Map m = null; + JavabinTupleStreamParser parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), false); + while ((m = parser.next()) != null) { + list.add(m); + } + assertEquals(l.size(), list.size()); + for(int i =0;i) list.get(i))); + } + + } + public static byte[] serialize(Object o) throws IOException { + SolrQueryResponse response = new SolrQueryResponse(); + response.getValues().add("results", o); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + new JavaBinCodec().marshal(response.getValues(), baos); + return baos.toByteArray(); + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JavabinTupleStreamParser.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JavabinTupleStreamParser.java new file mode 100644 index 00000000000..dfe8cc7e186 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JavabinTupleStreamParser.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + + +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.solr.common.util.DataInputInputStream; +import org.apache.solr.common.util.FastInputStream; +import org.apache.solr.common.util.JavaBinCodec; + +public class JavabinTupleStreamParser extends JavaBinCodec implements TupleStreamParser { + private final InputStream is; + final FastInputStream fis; + private int arraySize = Integer.MAX_VALUE; + private boolean onlyJsonTypes = false; + int objectSize; + + + public JavabinTupleStreamParser(InputStream is, boolean onlyJsonTypes) throws IOException { + this.onlyJsonTypes = onlyJsonTypes; + this.is = is; + this.fis = initRead(is); + if (!readTillDocs()) arraySize = 0; + } + + + private boolean readTillDocs() throws IOException { + if (isObjectType(fis)) { + if (tagByte == SOLRDOCLST) { + readVal(fis);// this is the metadata, throw it away + tagByte = fis.readByte(); + arraySize = readSize(fis); + return true; + } + for (int i = objectSize; i > 0; i--) { + Object k = readVal(fis); + if (k == END_OBJ) break; + if ("docs".equals(k)) { + tagByte = fis.readByte(); + if (tagByte == ITERATOR) return true;//docs must be an iterator or + if (tagByte >>> 5 == ARR >>> 5) {// an array + arraySize = readSize(fis); + return true; + } + return false; + } else { + if (readTillDocs()) return true; + } + } + } else { + readObject(fis); + return false; + } + return false; + + //here after it will be a stream of maps + } + + private boolean isObjectType(DataInputInputStream dis) throws IOException { + tagByte = dis.readByte(); + if (tagByte >>> 5 == ORDERED_MAP >>> 5 || + tagByte >>> 5 == NAMED_LST >>> 5) { + objectSize = readSize(dis); + return true; + } + if (tagByte == MAP) { + objectSize = readVInt(dis); + return true; + } + if (tagByte == MAP_ENTRY_ITER) { + objectSize = Integer.MAX_VALUE; + return true; + } + return tagByte == SOLRDOCLST; + } + + private Map readAsMap(DataInputInputStream dis) throws IOException { + int sz = readSize(dis); + Map m = new LinkedHashMap<>(); + for (int i = 0; i < sz; i++) { + String name = (String) readVal(dis); + Object val = readVal(dis); + m.put(name, val); + } + return m; + } + + private Map readSolrDocumentAsMap(DataInputInputStream dis) throws IOException { + tagByte = dis.readByte(); + int size = readSize(dis); + Map doc = new LinkedHashMap<>(); + for (int i = 0; i < size; i++) { + String fieldName; + Object obj = readVal(dis); // could be a field name, or a child document + if (obj instanceof Map) { + List l = (List) doc.get("_childDocuments_"); + if (l == null) doc.put("_childDocuments_", l = new ArrayList()); + l.add(obj); + continue; + } else { + fieldName = (String) obj; + } + Object fieldVal = readVal(dis); + doc.put(fieldName, fieldVal); + } + return doc; + } + + @Override + protected Object readObject(DataInputInputStream dis) throws IOException { + if (tagByte == SOLRDOC) { + return readSolrDocumentAsMap(dis); + } + if (onlyJsonTypes) { + switch (tagByte >>> 5) { + case SINT >>> 5: + int i = readSmallInt(dis); + return (long) i; + case ORDERED_MAP >>> 5: + return readAsMap(dis); + case NAMED_LST >>> 5: + return readAsMap(dis); + } + + switch (tagByte) { + case INT: { + int i = dis.readInt(); + return (long) i; + } + case FLOAT: { + float v = dis.readFloat(); + return (double) v; + } + case BYTE: { + byte b = dis.readByte(); + return (long) b; + } + case SHORT: { + short s = dis.readShort(); + return (long) s; + } + + case DATE: { + return Instant.ofEpochMilli(dis.readLong()).toString(); + } + + default: + return super.readObject(dis); + } + } else return super.readObject(dis); + } + + + @Override + public Map next() throws IOException { + if (arraySize == 0) return null; + Object o = readVal(fis); + arraySize--; + if (o == END_OBJ) return null; + return (Map) o; + } + + @Override + public void close() throws IOException { + is.close(); + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java index 6a217034002..f132815ba89 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java @@ -274,15 +274,18 @@ public class SolrStream extends TupleStream { } String wt = requestParams.get(CommonParams.WT, "json"); - assert CommonParams.JSON.equals(wt); QueryRequest query = new QueryRequest(requestParams); query.setPath(p); query.setResponseParser(new InputStreamResponseParser(wt)); query.setMethod(SolrRequest.METHOD.POST); NamedList genericResponse = server.request(query); InputStream stream = (InputStream) genericResponse.get("stream"); - InputStreamReader reader = new InputStreamReader(stream, "UTF-8"); - return new JSONTupleStream(reader); + if (CommonParams.JAVABIN.equals(wt)) { + return new JavabinTupleStreamParser(stream, true); + } else { + InputStreamReader reader = new InputStreamReader(stream, "UTF-8"); + return new JSONTupleStream(reader); + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java b/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java index b4af06f903b..2e69b8250c5 100644 --- a/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java +++ b/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java @@ -50,6 +50,10 @@ public class SolrDocument extends SolrDocumentBase impleme _fields = new LinkedHashMap<>(); } + public SolrDocument(Map fields) { + this._fields = fields; + } + /** * @return a list of field names defined in this document - this Collection is directly backed by this SolrDocument. * @see #keySet diff --git a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java index 32ccf360934..3e054d7cce9 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java @@ -169,6 +169,11 @@ public class JavaBinCodec implements PushWriter { byte version; public Object unmarshal(InputStream is) throws IOException { + FastInputStream dis = initRead(is); + return readVal(dis); + } + + protected FastInputStream initRead(InputStream is) throws IOException { assert !alreadyUnmarshalled; FastInputStream dis = FastInputStream.wrap(is); version = dis.readByte(); @@ -176,9 +181,9 @@ public class JavaBinCodec implements PushWriter { throw new RuntimeException("Invalid version (expected " + VERSION + ", but " + version + ") or the data in not in 'javabin' format"); } - + alreadyUnmarshalled = true; - return readVal(dis); + return dis; } @@ -243,7 +248,10 @@ public class JavaBinCodec implements PushWriter { public Object readVal(DataInputInputStream dis) throws IOException { tagByte = dis.readByte(); + return readObject(dis); + } + protected Object readObject(DataInputInputStream dis) throws IOException { // if ((tagByte & 0xe0) == 0) { // if top 3 bits are clear, this is a normal tag diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 619845646f0..d4304095190 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -2213,7 +2213,7 @@ public void testTrace() throws Exception { for (int idx = 0; idx < vals.length; idx += 2) { params.add(vals[idx], vals[idx + 1]); } - + if(random().nextBoolean()) params.add("wt","javabin"); return params; }