SOLR-9739: JavabinCodec implements PushWriter interface

This commit is contained in:
Noble Paul 2016-11-08 20:06:28 +05:30
parent 6abfad0234
commit bb25214d44
3 changed files with 179 additions and 16 deletions

View File

@ -143,6 +143,8 @@ Other Changes
* SOLR-9717: Refactor '/export' to not hardcode the JSON output and to use an API (noble) * SOLR-9717: Refactor '/export' to not hardcode the JSON output and to use an API (noble)
* SOLR-9739: JavabinCodec implements PushWriter interface (noble)
================== 6.3.0 ================== ================== 6.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -18,6 +18,7 @@
package org.apache.solr.response; package org.apache.solr.response;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
@ -29,6 +30,7 @@ import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter; import org.apache.solr.common.MapWriter;
import org.apache.solr.common.PushWriter; import org.apache.solr.common.PushWriter;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.LocalSolrQueryRequest;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -49,8 +51,11 @@ public class TestPushWriter extends SolrTestCaseJ4 {
writeData(pw); writeData(pw);
osw.flush(); osw.flush();
log.info(new String(baos.toByteArray(), "UTF-8")); log.info(new String(baos.toByteArray(), "UTF-8"));
Object m = Utils.fromJSON(baos.toByteArray()); Map m = (Map) Utils.fromJSON(baos.toByteArray());
checkValues((Map) m); checkValues(m);
writeData(new JavaBinCodec(baos= new ByteArrayOutputStream(), null));
m = (Map) new JavaBinCodec().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
checkValues(m);
} }
protected void checkValues(Map m) { protected void checkValues(Map m) {

View File

@ -33,7 +33,11 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.solr.common.EnumFieldValue; import org.apache.solr.common.EnumFieldValue;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.IteratorWriter.ItemWriter;
import org.apache.solr.common.MapSerializable; import org.apache.solr.common.MapSerializable;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.PushWriter;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
@ -55,7 +59,7 @@ import org.noggit.CharArr;
* <p> * <p>
* NOTE -- {@link JavaBinCodec} instances cannot be reused for more than one marshall or unmarshall operation. * NOTE -- {@link JavaBinCodec} instances cannot be reused for more than one marshall or unmarshall operation.
*/ */
public class JavaBinCodec { public class JavaBinCodec implements PushWriter {
public static final byte public static final byte
NULL = 0, NULL = 0,
@ -79,7 +83,7 @@ public class JavaBinCodec {
END = 15, END = 15,
SOLRINPUTDOC = 16, SOLRINPUTDOC = 16,
SOLRINPUTDOC_CHILDS = 17, MAP_ENTRY_ITER = 17,
ENUM_FIELD_VALUE = 18, ENUM_FIELD_VALUE = 18,
MAP_ENTRY = 19, MAP_ENTRY = 19,
// types that combine tag + length (or other info) in a single byte // types that combine tag + length (or other info) in a single byte
@ -108,6 +112,16 @@ public class JavaBinCodec {
writableDocFields =null; writableDocFields =null;
} }
/**
* Use this to use this as a PushWriter. ensure that close() is called explicitly after use
*
* @param os The output stream
*/
public JavaBinCodec(OutputStream os, ObjectResolver resolver) throws IOException {
this.resolver = resolver;
initWrite(os);
}
public JavaBinCodec(ObjectResolver resolver) { public JavaBinCodec(ObjectResolver resolver) {
this(resolver, null); this(resolver, null);
} }
@ -127,16 +141,25 @@ public class JavaBinCodec {
} }
public void marshal(Object nl, OutputStream os) throws IOException { public void marshal(Object nl, OutputStream os) throws IOException {
assert !alreadyMarshalled; initWrite(os);
init(FastOutputStream.wrap(os));
try { try {
daos.writeByte(VERSION);
writeVal(nl); writeVal(nl);
} finally { } finally {
finish();
}
}
protected void initWrite(OutputStream os) throws IOException {
assert !alreadyMarshalled;
init(FastOutputStream.wrap(os));
daos.writeByte(VERSION);
}
protected void finish() throws IOException {
closed = true;
daos.flushBuffer(); daos.flushBuffer();
alreadyMarshalled = true; alreadyMarshalled = true;
} }
}
/** expert: sets a new output stream */ /** expert: sets a new output stream */
public void init(FastOutputStream os) { public void init(FastOutputStream os) {
@ -281,6 +304,8 @@ public class JavaBinCodec {
return readEnumFieldValue(dis); return readEnumFieldValue(dis);
case MAP_ENTRY: case MAP_ENTRY:
return readMapEntry(dis); return readMapEntry(dis);
case MAP_ENTRY_ITER:
return readMapIter(dis);
} }
throw new RuntimeException("Unknown type " + tagByte); throw new RuntimeException("Unknown type " + tagByte);
@ -296,6 +321,10 @@ public class JavaBinCodec {
writeSolrDocumentList((SolrDocumentList) val); writeSolrDocumentList((SolrDocumentList) val);
return true; return true;
} }
if (val instanceof IteratorWriter) {
writeIterator((IteratorWriter) val);
return true;
}
if (val instanceof Collection) { if (val instanceof Collection) {
writeArray((Collection) val); writeArray((Collection) val);
return true; return true;
@ -313,6 +342,10 @@ public class JavaBinCodec {
writeSolrInputDocument((SolrInputDocument)val); writeSolrInputDocument((SolrInputDocument)val);
return true; return true;
} }
if (val instanceof MapWriter) {
writeMap((MapWriter) val);
return true;
}
if (val instanceof Map) { if (val instanceof Map) {
writeMap((Map) val); writeMap((Map) val);
return true; return true;
@ -346,6 +379,58 @@ public class JavaBinCodec {
return false; return false;
} }
private final MapWriter.EntryWriter ew = new MapWriter.EntryWriter() {
@Override
public MapWriter.EntryWriter put(String k, Object v) throws IOException {
writeExternString(k);
JavaBinCodec.this.writeVal(v);
return this;
}
@Override
public MapWriter.EntryWriter put(String k, int v) throws IOException {
writeExternString(k);
JavaBinCodec.this.writeInt(v);
return this;
}
@Override
public MapWriter.EntryWriter put(String k, long v) throws IOException {
writeExternString(k);
JavaBinCodec.this.writeLong(v);
return this;
}
@Override
public MapWriter.EntryWriter put(String k, float v) throws IOException {
writeExternString(k);
JavaBinCodec.this.writeFloat(v);
return this;
}
@Override
public MapWriter.EntryWriter put(String k, double v) throws IOException {
writeExternString(k);
JavaBinCodec.this.writeDouble(v);
return this;
}
@Override
public MapWriter.EntryWriter put(String k, boolean v) throws IOException {
writeExternString(k);
writeBoolean(v);
return this;
}
};
public void writeMap(MapWriter val) throws IOException {
writeTag(MAP_ENTRY_ITER);
val.writeMap(ew);
writeTag(END);
}
public void writeTag(byte tag) throws IOException { public void writeTag(byte tag) throws IOException {
daos.writeByte(tag); daos.writeByte(tag);
} }
@ -503,6 +588,17 @@ public class JavaBinCodec {
} }
public Map<Object, Object> readMapIter(DataInputInputStream dis) throws IOException {
Map<Object, Object> m = new LinkedHashMap<>();
for (; ; ) {
Object key = readVal(dis);
if (key == END_OBJ) break;
Object val = readVal(dis);
m.put(key, val);
}
return m;
}
public Map<Object,Object> readMap(DataInputInputStream dis) public Map<Object,Object> readMap(DataInputInputStream dis)
throws IOException { throws IOException {
int sz = readVInt(dis); int sz = readVInt(dis);
@ -516,12 +612,56 @@ public class JavaBinCodec {
return m; return m;
} }
private final ItemWriter itemWriter = new ItemWriter() {
@Override
public ItemWriter add(Object o) throws IOException {
writeVal(o);
return this;
}
@Override
public ItemWriter add(int v) throws IOException {
writeInt(v);
return this;
}
@Override
public ItemWriter add(long v) throws IOException {
writeLong(v);
return this;
}
@Override
public ItemWriter add(float v) throws IOException {
writeFloat(v);
return this;
}
@Override
public ItemWriter add(double v) throws IOException {
writeDouble(v);
return this;
}
@Override
public ItemWriter add(boolean v) throws IOException {
writeBoolean(v);
return this;
}
};
@Override
public void writeIterator(IteratorWriter val) throws IOException {
writeTag(ITERATOR);
val.writeIter(itemWriter);
writeTag(END);
}
public void writeIterator(Iterator iter) throws IOException { public void writeIterator(Iterator iter) throws IOException {
writeTag(ITERATOR); writeTag(ITERATOR);
while (iter.hasNext()) { while (iter.hasNext()) {
writeVal(iter.next()); writeVal(iter.next());
} }
writeVal(END_OBJ); writeTag(END);
} }
public List<Object> readIterator(DataInputInputStream fis) throws IOException { public List<Object> readIterator(DataInputInputStream fis) throws IOException {
@ -644,7 +784,7 @@ public class JavaBinCodec {
/** /**
* write the string as tag+length, with length being the number of UTF-8 bytes * write the string as tag+length, with length being the number of UTF-8 bytes
*/ */
public void writeStr(String s) throws IOException { public void writeStr(CharSequence s) throws IOException {
if (s == null) { if (s == null) {
writeTag(NULL); writeTag(NULL);
return; return;
@ -745,7 +885,7 @@ public class JavaBinCodec {
if (val == null) { if (val == null) {
daos.writeByte(NULL); daos.writeByte(NULL);
return true; return true;
} else if (val instanceof String) { } else if (val instanceof CharSequence) {
writeStr((String) val); writeStr((String) val);
return true; return true;
} else if (val instanceof Number) { } else if (val instanceof Number) {
@ -760,8 +900,7 @@ public class JavaBinCodec {
writeFloat(((Float) val).floatValue()); writeFloat(((Float) val).floatValue());
return true; return true;
} else if (val instanceof Double) { } else if (val instanceof Double) {
daos.writeByte(DOUBLE); writeDouble(((Double) val).doubleValue());
daos.writeDouble(((Double) val).doubleValue());
return true; return true;
} else if (val instanceof Byte) { } else if (val instanceof Byte) {
daos.writeByte(BYTE); daos.writeByte(BYTE);
@ -779,8 +918,7 @@ public class JavaBinCodec {
daos.writeLong(((Date) val).getTime()); daos.writeLong(((Date) val).getTime());
return true; return true;
} else if (val instanceof Boolean) { } else if (val instanceof Boolean) {
if ((Boolean) val) daos.writeByte(BOOL_TRUE); writeBoolean((Boolean) val);
else daos.writeByte(BOOL_FALSE);
return true; return true;
} else if (val instanceof byte[]) { } else if (val instanceof byte[]) {
writeByteArray((byte[]) val, 0, ((byte[]) val).length); writeByteArray((byte[]) val, 0, ((byte[]) val).length);
@ -796,6 +934,16 @@ public class JavaBinCodec {
return false; return false;
} }
protected void writeBoolean(boolean val) throws IOException {
if (val) daos.writeByte(BOOL_TRUE);
else daos.writeByte(BOOL_FALSE);
}
protected void writeDouble(double val) throws IOException {
daos.writeByte(DOUBLE);
daos.writeDouble(val);
}
public void writeMap(Map<?,?> val) throws IOException { public void writeMap(Map<?,?> val) throws IOException {
writeTag(MAP, val.size()); writeTag(MAP, val.size());
@ -1003,4 +1151,12 @@ public class JavaBinCodec {
return hash; return hash;
} }
} }
private boolean closed;
@Override
public void close() throws IOException {
if (closed) return;
finish();
}
} }