SOLR-2656, SOLR-2700: transaction logging and realtime-get

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1165869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2011-09-06 21:37:32 +00:00
parent ab554500bd
commit 1465c3edd4
19 changed files with 1296 additions and 82 deletions

View File

@ -165,6 +165,11 @@ New Features
* SOLR-1032: CSV handler now supports "literal.field_name=value" parameters.
(Simon Rosenthal, ehatcher)
* SOLR-2656: realtime-get, efficiently retrieves the latest stored fields for specified
documents, even if they are not yet searchable (i.e. without reopening a searcher)
(yonik)
Optimizations
----------------------

View File

@ -30,6 +30,7 @@ import org.apache.solr.search.FastLRUCache;
import org.apache.solr.search.QParserPlugin;
import org.apache.solr.search.ValueSourceParser;
import org.apache.solr.update.SolrIndexConfig;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.spelling.QueryConverter;
import org.apache.lucene.search.BooleanQuery;
@ -202,6 +203,7 @@ public class SolrConfig extends Config {
loadPluginInfo(CodecProviderFactory.class,"mainIndex/codecProviderFactory",false, false);
loadPluginInfo(IndexReaderFactory.class,"indexReaderFactory",false, true);
loadPluginInfo(UpdateRequestProcessorChain.class,"updateRequestProcessorChain",false, false);
loadPluginInfo(UpdateLog.class,"updateHandler/updateLog",false, false);
updateHandlerInfo = loadUpdatehandlerInfo();

View File

@ -889,6 +889,7 @@ public final class SolrCore implements SolrInfoMBean {
addIfNotPresent(components,MoreLikeThisComponent.COMPONENT_NAME,MoreLikeThisComponent.class);
addIfNotPresent(components,StatsComponent.COMPONENT_NAME,StatsComponent.class);
addIfNotPresent(components,DebugComponent.COMPONENT_NAME,DebugComponent.class);
addIfNotPresent(components,RealTimeGetComponent.COMPONENT_NAME,RealTimeGetComponent.class);
return components;
}
private <T> void addIfNotPresent(Map<String ,T> registry, String name, Class<? extends T> c){

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import org.apache.solr.handler.component.*;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
*
*
* All of the following options may be configured for this handler
* in the solrconfig as defaults, and may be overriden as request parameters.
* (TODO: complete documentation of request parameters here, rather than only
* on the wiki).
* </p>
*
* <ul>
* <li> highlight - Set to any value not .equal() to "false" to enable highlight
* generation</li>
* <li> highlightFields - Set to a comma- or space-delimited list of fields to
* highlight. If unspecified, uses the default query field</li>
* <li> maxSnippets - maximum number of snippets to generate per field-highlight.
* </li>
* </ul>
*
*/
public class RealTimeGetHandler extends SearchHandler {
@Override
protected List<String> getDefaultComponents()
{
List<String> names = new ArrayList<String>(1);
names.add(RealTimeGetComponent.COMPONENT_NAME);
return names;
}
//////////////////////// SolrInfoMBeans methods //////////////////////
@Override
public String getVersion() {
return "$Revision$";
}
@Override
public String getDescription() {
return "The realtime get handler";
}
@Override
public String getSourceId() {
return "$Id$";
}
@Override
public String getSource() {
return "$URL$";
}
@Override
public URL[] getDocs() {
return null;
}
}

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.ReturnFields;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* TODO!
*
*
* @since solr 1.3
*/
public class RealTimeGetComponent extends SearchComponent
{
public static final String COMPONENT_NAME = "get";
@Override
public void prepare(ResponseBuilder rb) throws IOException {
// Set field flags
ReturnFields returnFields = new ReturnFields( rb.req );
rb.rsp.setReturnFields( returnFields );
}
@Override
public void process(ResponseBuilder rb) throws IOException
{
SolrQueryRequest req = rb.req;
SolrQueryResponse rsp = rb.rsp;
SolrParams params = req.getParams();
if (!params.getBool(COMPONENT_NAME, true)) {
return;
}
String id[] = params.getParams("id");
String ids[] = params.getParams("ids");
if (id == null && ids == null) {
return;
}
String[] allIds = id==null ? new String[0] : id;
if (ids != null) {
List<String> lst = new ArrayList<String>();
for (String s : allIds) {
lst.add(s);
}
for (String idList : ids) {
lst.addAll( StrUtils.splitSmart(idList, ",", true) );
}
allIds = lst.toArray(new String[lst.size()]);
}
SchemaField idField = req.getSchema().getUniqueKeyField();
FieldType fieldType = idField.getType();
SolrDocumentList docList = new SolrDocumentList();
UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
RefCounted<SolrIndexSearcher> searcherHolder = null;
try {
SolrIndexSearcher searcher = null;
BytesRef idBytes = new BytesRef();
for (String idStr : allIds) {
fieldType.readableToIndexed(idStr, idBytes);
if (ulog != null) {
Object o = ulog.lookup(idBytes);
if (o != null) {
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o;
assert entry.size() >= 3;
int oper = (Integer)entry.get(0);
switch (oper) {
case UpdateLog.ADD:
docList.add(toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), req.getSchema()));
break;
case UpdateLog.DELETE:
break;
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
continue;
}
}
// didn't find it in the update log, so it should be in the newest searcher opened
if (searcher == null) {
searcherHolder = req.getCore().getNewestSearcher(false);
searcher = searcherHolder.get();
}
int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes));
if (docid < 0) continue;
Document luceneDocument = searcher.doc(docid);
docList.add(toSolrDoc(luceneDocument, req.getSchema()));
}
} finally {
if (searcherHolder != null) {
searcherHolder.decref();
}
}
// if the client specified a single id=foo, then use "doc":{
// otherwise use a standard doclist
if (ids == null && allIds.length <= 1) {
// if the doc was not found, then use a value of null.
rsp.add("doc", docList.size() > 0 ? docList.get(0) : null);
} else {
docList.setNumFound(docList.size());
rsp.add("response", docList);
}
}
private static SolrDocument toSolrDoc(Document doc, IndexSchema schema) {
SolrDocument out = new SolrDocument();
for( IndexableField f : doc.getFields() ) {
// Make sure multivalued fields are represented as lists
Object existing = out.get(f.name());
if (existing == null) {
SchemaField sf = schema.getFieldOrNull(f.name());
if (sf != null && sf.multiValued()) {
List<Object> vals = new ArrayList<Object>();
vals.add( f );
out.setField( f.name(), vals );
}
else{
out.setField( f.name(), f );
}
}
else {
out.addField( f.name(), f );
}
}
return out;
}
private static SolrDocument toSolrDoc(SolrInputDocument sdoc, IndexSchema schema) {
// TODO: do something more performant than this double conversion
Document doc = DocumentBuilder.toDocument(sdoc, schema);
List<IndexableField> fields = doc.getFields();
// copy the stored fields only
Document out = new Document();
for (IndexableField f : doc.getFields()) {
if (f.stored()) {
out.add(f);
}
}
return toSolrDoc(out, schema);
}
////////////////////////////////////////////
/// SolrInfoMBean
////////////////////////////////////////////
@Override
public String getDescription() {
return "query";
}
@Override
public String getVersion() {
return "$Revision$";
}
@Override
public String getSourceId() {
return "$Id$";
}
@Override
public String getSource() {
return "$URL$";
}
@Override
public URL[] getDocs() {
return null;
}
}

View File

@ -46,7 +46,10 @@ public final class QueryResultKey {
int h = query.hashCode();
if (filters != null) h ^= filters.hashCode();
if (filters != null) {
for (Query filt : filters)
h += filters.hashCode();
}
sfields = (this.sort !=null) ? this.sort.getSort() : defaultSort;
for (SortField sf : sfields) {

View File

@ -24,7 +24,10 @@ import java.io.IOException;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
@ -53,6 +56,7 @@ import org.apache.solr.search.SolrIndexSearcher;
*/
public class DirectUpdateHandler2 extends UpdateHandler {
protected SolrCoreState indexWriterProvider;
protected final Lock commitLock = new ReentrantLock();
// stats
AtomicLong addCommands = new AtomicLong();
@ -110,6 +114,8 @@ public class DirectUpdateHandler2 extends UpdateHandler {
int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime; // getInt("updateHandler/autoSoftCommit/maxTime", -1);
softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true);
this.ulog = updateHandler.getUpdateLog();
this.ulog.init(this, core);
}
private void deleteAll() throws IOException {
@ -163,6 +169,12 @@ public class DirectUpdateHandler2 extends UpdateHandler {
writer.addDocument(cmd.getLuceneDocument());
}
// Add to the transaction log *after* successfully adding to the index, if there was no error.
// This ordering ensures that if we log it, it's definitely been added to the the index.
// This also ensures that if a commit sneaks in-between, that we know everything in a particular
// log version was definitely committed.
ulog.add(cmd);
rc = 1;
} finally {
if (rc!=1) {
@ -185,6 +197,8 @@ public class DirectUpdateHandler2 extends UpdateHandler {
indexWriterProvider.getIndexWriter(core).deleteDocuments(new Term(idField.getName(), cmd.getIndexedId()));
ulog.delete(cmd);
if (commitTracker.getTimeUpperBound() > 0) {
commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
}
@ -216,7 +230,9 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} else {
indexWriterProvider.getIndexWriter(core).deleteDocuments(q);
}
ulog.deleteByQuery(cmd);
madeIt = true;
if (commitTracker.getTimeUpperBound() > 0) {
@ -278,6 +294,11 @@ public class DirectUpdateHandler2 extends UpdateHandler {
boolean error=true;
try {
// only allow one hard commit to proceed at once
if (!cmd.softCommit) {
commitLock.lock();
}
log.info("start "+cmd);
if (cmd.optimize) {
@ -287,6 +308,10 @@ public class DirectUpdateHandler2 extends UpdateHandler {
}
if (!cmd.softCommit) {
synchronized (this) { // sync is currently needed to prevent preCommit from being called between preSoft and postSoft... see postSoft comments.
ulog.preCommit(cmd);
}
writer.commit();
numDocsPending.set(0);
callPostCommitCallbacks();
@ -300,13 +325,23 @@ public class DirectUpdateHandler2 extends UpdateHandler {
}
synchronized (this) {
if (cmd.softCommit) {
core.getSearcher(true,false,waitSearcher, true);
// ulog.preSoftCommit();
synchronized (this) {
ulog.preSoftCommit(cmd);
core.getSearcher(true,false,waitSearcher, true);
ulog.postSoftCommit(cmd);
}
// ulog.postSoftCommit();
} else {
core.getSearcher(true,false,waitSearcher);
synchronized (this) {
ulog.preSoftCommit(cmd);
core.getSearcher(true,false,waitSearcher);
ulog.postSoftCommit(cmd);
}
ulog.postCommit(cmd); // postCommit currently means new searcher has also been opened
}
}
// reset commit tracking
@ -321,6 +356,10 @@ public class DirectUpdateHandler2 extends UpdateHandler {
error=false;
}
finally {
if (!cmd.softCommit) {
commitLock.unlock();
}
addCommands.set(0);
deleteByIdCommands.set(0);
deleteByQueryCommands.set(0);
@ -396,6 +435,10 @@ public class DirectUpdateHandler2 extends UpdateHandler {
}
}
@Override
public UpdateLog getUpdateLog() {
return ulog;
}
@Override
public void close() throws IOException {

View File

@ -0,0 +1,595 @@
package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
class NullUpdateLog extends UpdateLog {
@Override
public void init(PluginInfo info) {
}
@Override
public void init(UpdateHandler uhandler, SolrCore core) {
}
@Override
public void add(AddUpdateCommand cmd) {
}
@Override
public void delete(DeleteUpdateCommand cmd) {
}
@Override
public void deleteByQuery(DeleteUpdateCommand cmd) {
}
@Override
public void preCommit(CommitUpdateCommand cmd) {
}
@Override
public void postCommit(CommitUpdateCommand cmd) {
}
@Override
public void preSoftCommit(CommitUpdateCommand cmd) {
}
@Override
public void postSoftCommit(CommitUpdateCommand cmd) {
}
@Override
public Object lookup(BytesRef indexedId) {
return null;
}
@Override
public void close() {
}
}
public class FSUpdateLog extends UpdateLog {
public static String TLOG_NAME="tlog";
long id = -1;
private TransactionLog tlog;
private TransactionLog prevTlog;
private Map<BytesRef,LogPtr> map = new HashMap<BytesRef, LogPtr>();
private Map<BytesRef,LogPtr> prevMap; // used while committing/reopening is happening
private Map<BytesRef,LogPtr> prevMap2; // used while committing/reopening is happening
private TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap
private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap
private String[] tlogFiles;
private File tlogDir;
private Collection<String> globalStrings;
private String dataDir;
private String lastDataDir;
@Override
public void init(PluginInfo info) {
dataDir = (String)info.initArgs.get("dir");
}
public void init(UpdateHandler uhandler, SolrCore core) {
if (dataDir == null || dataDir.length()==0) {
dataDir = core.getDataDir();
}
if (dataDir.equals(lastDataDir)) {
// on a normal reopen, we currently shouldn't have to do anything
return;
}
lastDataDir = dataDir;
tlogDir = new File(dataDir, TLOG_NAME);
tlogDir.mkdirs();
tlogFiles = getLogList(tlogDir);
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
}
static class LogPtr {
final long pointer;
public LogPtr(long pointer) {
this.pointer = pointer;
}
public String toString() {
return "LogPtr(" + pointer + ")";
}
}
public static String[] getLogList(File directory) {
final String prefix = TLOG_NAME+'.';
String[] names = directory.list(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith(prefix);
}
});
Arrays.sort(names);
return names;
}
public long getLastLogId() {
if (id != -1) return id;
if (tlogFiles.length == 0) return -1;
String last = tlogFiles[tlogFiles.length-1];
return Long.parseLong(last.substring(TLOG_NAME.length()+1));
}
@Override
public void add(AddUpdateCommand cmd) {
synchronized (this) {
ensureLog();
long pos = tlog.write(cmd);
LogPtr ptr = new LogPtr(pos);
map.put(cmd.getIndexedId(), ptr);
// System.out.println("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
}
@Override
public void delete(DeleteUpdateCommand cmd) {
BytesRef br = cmd.getIndexedId();
synchronized (this) {
ensureLog();
long pos = tlog.writeDelete(cmd);
LogPtr ptr = new LogPtr(pos);
map.put(br, ptr);
// System.out.println("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
}
@Override
public void deleteByQuery(DeleteUpdateCommand cmd) {
synchronized (this) {
ensureLog();
// TODO: how to support realtime-get, optimistic concurrency, or anything else in this case?
// Maybe we shouldn't?
// realtime-get could just do a reopen of the searcher
// optimistic concurrency? Maybe we shouldn't support deleteByQuery w/ optimistic concurrency
long pos = tlog.writeDeleteByQuery(cmd);
LogPtr ptr = new LogPtr(pos);
// System.out.println("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
}
private void newMap() {
prevMap2 = prevMap;
prevMapLog2 = prevMapLog;
prevMap = map;
prevMapLog = tlog;
map = new HashMap<BytesRef, LogPtr>();
}
private void clearOldMaps() {
prevMap = null;
prevMap2 = null;
}
@Override
public void preCommit(CommitUpdateCommand cmd) {
synchronized (this) {
// since we're changing the log, we must change the map.
newMap();
// since document additions can happen concurrently with commit, create
// a new transaction log first so that we know the old one is definitely
// in the index.
prevTlog = tlog;
tlog = null;
id++;
if (prevTlog != null) {
globalStrings = prevTlog.getGlobalStrings();
}
}
}
@Override
public void postCommit(CommitUpdateCommand cmd) {
synchronized (this) {
if (prevTlog != null) {
prevTlog.decref();
prevTlog = null;
}
}
}
@Override
public void preSoftCommit(CommitUpdateCommand cmd) {
synchronized (this) {
if (!cmd.softCommit) return; // already handled this at the start of the hard commit
newMap();
// start adding documents to a new map since we won't know if
// any added documents will make it into this commit or not.
// But we do know that any updates already added will definitely
// show up in the latest reader after the commit succeeds.
map = new HashMap<BytesRef, LogPtr>();
// System.out.println("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
}
}
@Override
public void postSoftCommit(CommitUpdateCommand cmd) {
synchronized (this) {
// We can clear out all old maps now that a new searcher has been opened.
// This currently only works since DUH2 synchronizes around preCommit to avoid
// it being called in the middle of a preSoftCommit, postSoftCommit sequence.
// If this DUH2 synchronization were to be removed, preSoftCommit should
// record what old maps were created and only remove those.
clearOldMaps();
// System.out.println("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap));
}
}
@Override
public Object lookup(BytesRef indexedId) {
LogPtr entry;
TransactionLog lookupLog;
synchronized (this) {
entry = map.get(indexedId);
lookupLog = tlog; // something found in "map" will always be in "tlog"
// System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " + System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog);
if (entry == null && prevMap != null) {
entry = prevMap.get(indexedId);
// something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
lookupLog = prevMapLog;
// System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
}
if (entry == null && prevMap2 != null) {
entry = prevMap2.get(indexedId);
// something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
lookupLog = prevMapLog2;
// System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
}
if (entry == null) {
return null;
}
lookupLog.incref();
}
try {
// now do the lookup outside of the sync block for concurrency
return lookupLog.lookup(entry.pointer);
} finally {
lookupLog.decref();
}
}
private void ensureLog() {
if (tlog == null) {
String newLogName = String.format("%s.%019d", TLOG_NAME, id);
tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
}
}
@Override
public void close() {
synchronized (this) {
if (prevTlog != null) {
prevTlog.decref();
}
if (tlog != null) {
tlog.decref();
}
}
}
}
/**
* Log Format: List{Operation, Version, ...}
* ADD, VERSION, DOC
* DELETE, VERSION, ID_BYTES
* DELETE_BY_QUERY, VERSION, String
*
* TODO: keep two files, one for [operation, version, id] and the other for the actual
* document data. That way we could throw away document log files more readily
* while retaining the smaller operation log files longer (and we can retrieve
* the stored fields from the latest documents from the index).
*
* This would require keeping all source fields stored of course.
*
* This would also allow to not log document data for requests with commit=true
* in them (since we know that if the request succeeds, all docs will be committed)
*
*/
class TransactionLog {
long id;
File tlogFile;
RandomAccessFile raf;
FileChannel channel;
OutputStream os;
FastOutputStream fos;
InputStream is;
long start;
volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
AtomicInteger refcount = new AtomicInteger(1);
Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
List<String> globalStringList = new ArrayList<String>();
// write a BytesRef as a byte array
JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@Override
public Object resolve(Object o, JavaBinCodec codec) throws IOException {
if (o instanceof BytesRef) {
BytesRef br = (BytesRef)o;
codec.writeByteArray(br.bytes, br.offset, br.length);
return null;
}
return o;
}
};
public class LogCodec extends JavaBinCodec {
public LogCodec() {
super(resolver);
}
@Override
public void writeExternString(String s) throws IOException {
if (s == null) {
writeTag(NULL);
return;
}
// no need to synchronize globalStringMap - it's only updated before the first record is written to the log
Integer idx = globalStringMap.get(s);
if (idx == null) {
// write a normal string
writeStr(s);
} else {
// write the extern string
writeTag(EXTERN_STRING, idx);
}
}
@Override
public String readExternString(FastInputStream fis) throws IOException {
int idx = readSize(fis);
if (idx != 0) {// idx != 0 is the index of the extern string
// no need to synchronize globalStringList - it's only updated before the first record is written to the log
return globalStringList.get(idx - 1);
} else {// idx == 0 means it has a string value
// this shouldn't happen with this codec subclass.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
}
}
}
public long writeData(Object o) {
LogCodec codec = new LogCodec();
try {
long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
codec.init(fos);
codec.writeVal(o);
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
TransactionLog(File tlogFile, Collection<String> globalStrings) {
try {
this.tlogFile = tlogFile;
raf = new RandomAccessFile(this.tlogFile, "rw");
start = raf.length();
channel = raf.getChannel();
os = Channels.newOutputStream(channel);
fos = FastOutputStream.wrap(os);
addGlobalStrings(globalStrings);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
private void addGlobalStrings(Collection<String> strings) {
if (strings == null) return;
int origSize = globalStringMap.size();
for (String s : strings) {
Integer idx = null;
if (origSize > 0) {
idx = globalStringMap.get(s);
}
if (idx != null) continue; // already in list
globalStringList.add(s);
globalStringMap.put(s, globalStringList.size());
}
assert globalStringMap.size() == globalStringList.size();
}
Collection<String> getGlobalStrings() {
synchronized (fos) {
return new ArrayList<String>(globalStringList);
}
}
private void writeLogHeader(LogCodec codec) throws IOException {
NamedList header = new NamedList<Object>();
header.add("SOLR_TLOG",1); // a magic string + version number?
header.add("strings",globalStringList);
codec.marshal(header, fos);
}
public long write(AddUpdateCommand cmd) {
LogCodec codec = new LogCodec();
synchronized (fos) {
try {
long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
SolrInputDocument sdoc = cmd.getSolrInputDocument();
if (pos == 0) { // TODO: needs to be changed if we start writing a header first
addGlobalStrings(sdoc.getFieldNames());
pos = start + fos.size();
}
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.ADD); // should just take one byte
codec.writeLong(0); // the version... should also just be one byte if 0
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
// fos.flushBuffer(); // flush later
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}
public long writeDelete(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec();
synchronized (fos) {
try {
long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
writeLogHeader(codec);
pos = start + fos.size();
}
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE); // should just take one byte
codec.writeLong(0); // the version... should also just be one byte if 0
BytesRef br = cmd.getIndexedId();
codec.writeByteArray(br.bytes, br.offset, br.length);
// fos.flushBuffer(); // flush later
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}
public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec();
synchronized (fos) {
try {
long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
writeLogHeader(codec);
pos = start + fos.size();
}
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
codec.writeLong(0); // the version... should also just be one byte if 0
codec.writeStr(cmd.query);
// fos.flushBuffer(); // flush later
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}
/* This method is thread safe */
public Object lookup(long pos) {
try {
// make sure any unflushed buffer has been flushed
synchronized (fos) {
// TODO: optimize this by keeping track of what we have flushed up to
fos.flushBuffer();
}
ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
LogCodec codec = new LogCodec();
return codec.readVal(fis);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public void incref() {
refcount.incrementAndGet();
}
public void decref() {
if (refcount.decrementAndGet() == 0) {
close();
}
}
private void close() {
try {
fos.flush();
fos.close();
if (deleteOnClose) {
tlogFile.delete();
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public String toString() {
return tlogFile.toString();
}
}
class ChannelFastInputStream extends FastInputStream {
FileChannel ch;
long chPosition;
public ChannelFastInputStream(FileChannel ch, long chPosition) {
super(null);
this.ch = ch;
this.chPosition = chPosition;
}
@Override
public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
int ret = ch.read(bb, chPosition);
if (ret >= 0) {
chPosition += ret;
}
return ret;
}
@Override
public void close() throws IOException {
ch.close();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.List;
import java.util.Vector;
import org.apache.solr.core.PluginInfo;
@ -54,6 +55,8 @@ public abstract class UpdateHandler implements SolrInfoMBean {
protected Vector<SolrEventListener> softCommitCallbacks = new Vector<SolrEventListener>();
protected Vector<SolrEventListener> optimizeCallbacks = new Vector<SolrEventListener>();
protected UpdateLog ulog;
/**
* Called when a SolrCore using this UpdateHandler is closed.
*/
@ -81,6 +84,19 @@ public abstract class UpdateHandler implements SolrInfoMBean {
}
}
private void initLog() {
PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
if (ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog");
} else {
ulog = new NullUpdateLog();
ulog.init(null);
}
ulog.init(this, core);
}
protected void callPostCommitCallbacks() {
for (SolrEventListener listener : commitCallbacks) {
listener.postCommit();
@ -105,6 +121,7 @@ public abstract class UpdateHandler implements SolrInfoMBean {
idField = schema.getUniqueKeyField();
idFieldType = idField!=null ? idField.getType() : null;
parseEventListeners();
initLog();
}
/**
@ -133,7 +150,7 @@ public abstract class UpdateHandler implements SolrInfoMBean {
public abstract void commit(CommitUpdateCommand cmd) throws IOException;
public abstract void rollback(RollbackUpdateCommand cmd) throws IOException;
public abstract void close() throws IOException;
public abstract UpdateLog getUpdateLog();
/**
* NOTE: this function is not thread safe. However, it is safe to call within the

View File

@ -0,0 +1,22 @@
package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.plugin.PluginInfoInitialized;
public abstract class UpdateLog implements PluginInfoInitialized {
public static final int ADD = 0x00;
public static final int DELETE = 0x01;
public static final int DELETE_BY_QUERY = 0x02;
public abstract void init(UpdateHandler uhandler, SolrCore core);
public abstract void add(AddUpdateCommand cmd);
public abstract void delete(DeleteUpdateCommand cmd);
public abstract void deleteByQuery(DeleteUpdateCommand cmd);
public abstract void preCommit(CommitUpdateCommand cmd);
public abstract void postCommit(CommitUpdateCommand cmd);
public abstract void preSoftCommit(CommitUpdateCommand cmd);
public abstract void postSoftCommit(CommitUpdateCommand cmd);
public abstract Object lookup(BytesRef indexedId);
public abstract void close();
}

View File

@ -0,0 +1,38 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
<requestHandler name="standard" class="solr.StandardRequestHandler">
</requestHandler>
<requestHandler name="/get" class="solr.RealTimeGetHandler">
<lst name="defaults">
<str name="omitHeader">true</str>
</lst>
</requestHandler>
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog class="solr.FSUpdateLog">
<!-- <str name="dir">/tmp/solr/</str> -->
</updateLog>
</updateHandler>
</config>

View File

@ -252,6 +252,13 @@
<bool name="httpCaching">true</bool>
</requestHandler>
<requestHandler name="/get" class="solr.RealTimeGetHandler">
<lst name="defaults">
<str name="omitHeader">true</str>
</lst>
</requestHandler>
<requestHandler name="dismax" class="solr.SearchHandler" >
<lst name="defaults">
<str name="defType">dismax</str>

View File

@ -116,8 +116,10 @@ public class MinimalSchemaTest extends SolrTestCaseJ4 {
assertQ("failure w/handler: '" + handler + "'",
req("qt", handler,
// this should be fairly innoculous for any type of query
"q", "foo:bar")
// this should be fairly innocuous for any type of query
"q", "foo:bar",
"omitHeader", "false"
)
,"//lst[@name='responseHeader']"
);
} catch (Exception e) {

View File

@ -16,25 +16,80 @@
*/
package org.apache.solr.search;
import org.apache.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.request.SolrQueryRequest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Ignore;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestRealTimeGet extends SolrTestCaseJ4 {
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig.xml","schema12.xml");
initCore("solrconfig-tlog.xml","schema12.xml");
}
@Test
public void testGetRealtime() throws Exception {
clearIndex();
assertU(commit());
assertU(adoc("id","1"));
assertJQ(req("q","id:1")
,"/response/numFound==0"
);
assertJQ(req("qt","/get","id","1")
,"=={'doc':{'id':'1'}}"
);
assertJQ(req("qt","/get","ids","1")
,"=={" +
" 'response':{'numFound':1,'start':0,'docs':[" +
" {" +
" 'id':'1'}]" +
" }}}"
);
assertU(commit());
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
assertJQ(req("qt","/get","id","1")
,"=={'doc':{'id':'1'}}"
);
assertJQ(req("qt","/get","ids","1")
,"=={" +
" 'response':{'numFound':1,'start':0,'docs':[" +
" {" +
" 'id':'1'}]" +
" }}}"
);
assertU(delI("1"));
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
assertJQ(req("qt","/get","id","1")
,"=={'doc':null}"
);
assertJQ(req("qt","/get","ids","1")
,"=={'response':{'numFound':0,'start':0,'docs':[]}}"
);
}
/***
@Test
public void testGetRealtime() throws Exception {
@ -69,6 +124,19 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
***/
public static void verbose(Object... args) {
if (!VERBOSE) return;
StringBuilder sb = new StringBuilder("TEST:");
sb.append(Thread.currentThread().getName());
sb.append(':');
for (Object o : args) {
sb.append(' ');
sb.append(o.toString());
}
System.out.println(sb.toString());
}
final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
long snapshotCount;
@ -94,20 +162,24 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
@Test
public void testStressGetRealtime() throws Exception {
// update variables
final int commitPercent = 10;
final int softCommitPercent = 50; // what percent of the commits are soft
final int deletePercent = 8;
final int deleteByQueryPercent = 4;
final int ndocs = 100;
int nWriteThreads = 10;
clearIndex();
assertU(commit());
final int commitPercent = 5 + random.nextInt(20);
final int softCommitPercent = 30+random.nextInt(60); // what percent of the commits are soft
final int deletePercent = 4+random.nextInt(25);
final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query
final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
int nWriteThreads = 5 + random.nextInt(25);
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
// query variables
final int percentRealtimeQuery = 0; // realtime get is not implemented yet
final AtomicLong operations = new AtomicLong(atLeast(10000)); // number of query operations to perform in total
int nReadThreads = 10;
// query variables
final int percentRealtimeQuery = 60;
final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
int nReadThreads = 5 + random.nextInt(25);
initModel(ndocs);
@ -121,6 +193,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
@Override
public void run() {
try {
while (operations.get() > 0) {
int oper = rand.nextInt(100);
@ -134,14 +207,22 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
version = snapshotCount++;
}
if (rand.nextInt(100) < softCommitPercent)
if (rand.nextInt(100) < softCommitPercent) {
verbose("softCommit start");
assertU(h.commit("softCommit","true"));
else
verbose("softCommit end");
} else {
verbose("commit start");
assertU(commit());
verbose("commit end");
}
synchronized(TestRealTimeGet.this) {
// install this snapshot only if it's newer than the current one
// install this model snapshot only if it's newer than the current one
if (version >= committedModelClock) {
if (VERBOSE) {
verbose("installing new committedModel version="+committedModelClock);
}
committedModel = newCommittedModel;
committedModelClock = version;
}
@ -169,14 +250,37 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
long nextVal = Math.abs(val)+1;
if (oper < commitPercent + deletePercent) {
if (VERBOSE) {
verbose("deleting id",id,"val=",nextVal);
}
assertU("<delete><id>" + id + "</id></delete>");
model.put(id, -nextVal);
if (VERBOSE) {
verbose("deleting id", id, "val=",nextVal,"DONE");
}
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
if (VERBOSE) {
verbose("deleteByQuery id ",id, "val=",nextVal);
}
assertU("<delete><query>id:" + id + "</query></delete>");
model.put(id, -nextVal);
if (VERBOSE) {
verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
}
} else {
if (VERBOSE) {
verbose("adding id", id, "val=", nextVal);
}
assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
model.put(id, nextVal);
if (VERBOSE) {
verbose("adding id", id, "val=", nextVal,"DONE");
}
}
}
@ -184,6 +288,11 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
lastId = id;
}
}
} catch (Throwable e) {
operations.set(-1L);
SolrException.log(log, e);
fail(e.getMessage());
}
}
};
@ -199,7 +308,6 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
public void run() {
try {
while (operations.decrementAndGet() >= 0) {
int oper = rand.nextInt(100);
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
@ -217,6 +325,9 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
}
}
if (VERBOSE) {
verbose("querying id", id);
}
SolrQueryRequest sreq;
if (realTime) {
sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
@ -232,14 +343,17 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
} else {
assertEquals(1, doclist.size());
long foundVal = (Long)(((Map)doclist.get(0)).get(field));
assertTrue(foundVal >= Math.abs(val));
if (foundVal < Math.abs(val)) {
verbose("ERROR, id=", id, "foundVal=",foundVal,"model val=",val);
assertTrue(foundVal >= Math.abs(val));
}
}
}
}
catch (Throwable e) {
operations.set(-1L);
SolrException.log(log,e);
fail(e.toString());
SolrException.log(log, e);
fail(e.getMessage());
}
}
};
@ -255,5 +369,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
for (Thread thread : threads) {
thread.join();
}
}
}

View File

@ -343,6 +343,17 @@
<arr name="env"> <str>MYVAR=val1</str> </arr>
</listener>
-->
<!-- Enables a transaction log, currently used for real-time get.
"dir" - the target directory for transaction logs, defaults to the
solr data directory. -->
<!--
<updateLog class="solr.FSUpdateLog">
<str name="dir">${solr.data.dir:}</str>
</updateLog>
-->
</updateHandler>
<!-- IndexReaderFactory
@ -773,6 +784,17 @@
-->
</requestHandler>
<!-- realtime get handler, guaranteed to return the latest stored fields of
any document, without the need to commit or open a new searcher. The
current implementation relies on the updateLog feature being enabled. -->
<requestHandler name="/get" class="solr.RealTimeGetHandler">
<lst name="defaults">
<str name="omitHeader">true</str>
</lst>
</requestHandler>
<!-- A Robust Example
This example SearchHandler declaration shows off usage of the

View File

@ -66,16 +66,8 @@ public class JavaBinUpdateRequestCodec {
nl.add("delById", updateRequest.getDeleteById());
nl.add("delByQ", updateRequest.getDeleteQuery());
nl.add("docs", docIter);
new JavaBinCodec(){
@Override
public void writeMap(Map val) throws IOException {
if (val instanceof SolrInputDocument) {
writeVal(solrInputDocumentToList((SolrInputDocument) val));
} else {
super.writeMap(val);
}
}
}.marshal(nl, os);
JavaBinCodec codec = new JavaBinCodec();
codec.marshal(nl, os);
}
/**
@ -136,20 +128,23 @@ public class JavaBinUpdateRequestCodec {
while (true) {
Object o = readVal(fis);
if (o == END_OBJ) break;
handler.document(listToSolrInputDocument((List<NamedList>) o), updateRequest);
SolrInputDocument sdoc = (SolrInputDocument)o;
handler.document(sdoc, updateRequest);
}
return Collections.EMPTY_LIST;
}
};
codec.unmarshal(is);
delById = (List<String>) namedList[0].get("delById");
delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List<List<NamedList>>) namedList[0].get("docs");
doclist = (List) namedList[0].get("docs");
if (doclist != null && !doclist.isEmpty()) {
List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
for (List<NamedList> n : doclist) {
solrInputDocs.add(listToSolrInputDocument(n));
for (Object o : doclist) {
solrInputDocs.add((SolrInputDocument)o);
}
updateRequest.add(solrInputDocs);
}
@ -167,37 +162,6 @@ public class JavaBinUpdateRequestCodec {
}
private List<NamedList> solrInputDocumentToList(SolrInputDocument doc) {
List<NamedList> l = new ArrayList<NamedList>();
NamedList nl = new NamedList();
nl.add("boost", doc.getDocumentBoost() == 1.0f ? null : doc.getDocumentBoost());
l.add(nl);
Iterator<SolrInputField> it = doc.iterator();
while (it.hasNext()) {
nl = new NamedList();
SolrInputField field = it.next();
nl.add("name", field.getName());
nl.add("val", field.getValue());
nl.add("boost", field.getBoost() == 1.0f ? null : field.getBoost());
l.add(nl);
}
return l;
}
private SolrInputDocument listToSolrInputDocument(List<NamedList> namedList) {
SolrInputDocument doc = new SolrInputDocument();
for (int i = 0; i < namedList.size(); i++) {
NamedList nl = namedList.get(i);
if (i == 0) {
doc.setDocumentBoost(nl.getVal(0) == null ? 1.0f : (Float) nl.getVal(0));
} else {
doc.addField((String) nl.getVal(0),
nl.getVal(1),
nl.getVal(2) == null ? 1.0f : (Float) nl.getVal(2));
}
}
return doc;
}
private NamedList solrParamsToNamedList(SolrParams params) {
if (params == null) return new NamedList();

View File

@ -63,9 +63,13 @@ public class FastInputStream extends InputStream implements DataInput {
return buf[pos++] & 0xff;
}
public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
return in.read(target, offset, len);
}
public void refill() throws IOException {
// this will set end to -1 at EOF
end = in.read(buf, 0, buf.length);
end = readWrappedStream(buf, 0, buf.length);
pos = 0;
}
@ -88,7 +92,7 @@ public class FastInputStream extends InputStream implements DataInput {
// amount left to read is >= buffer size
if (len-r >= buf.length) {
int ret = in.read(b, off+r, len-r);
int ret = readWrappedStream(b, off+r, len-r);
if (ret==-1) return r==0 ? -1 : r;
r += ret;
return r;

View File

@ -181,9 +181,11 @@ public class FastOutputStream extends OutputStream implements DataOutput {
* underlying stream.
*/
public void flushBuffer() throws IOException {
out.write(buf, 0, pos);
written += pos;
pos=0;
if (pos > 0) {
out.write(buf, 0, pos);
written += pos;
pos=0;
}
}
public long size() {

View File

@ -18,6 +18,8 @@ package org.apache.solr.common.util;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import java.io.IOException;
import java.io.InputStream;
@ -58,6 +60,8 @@ public class JavaBinCodec {
*/
END = 15,
SOLRINPUTDOC = 16,
// types that combine tag + length (or other info) in a single byte
TAG_AND_LEN = (byte) (1 << 5),
STR = (byte) (1 << 5),
@ -81,7 +85,7 @@ public class JavaBinCodec {
}
public void marshal(Object nl, OutputStream os) throws IOException {
daos = FastOutputStream.wrap(os);
init(FastOutputStream.wrap(os));
try {
daos.writeByte(VERSION);
writeVal(nl);
@ -90,6 +94,11 @@ public class JavaBinCodec {
}
}
/** expert: sets a new output stream */
public void init(FastOutputStream os) {
daos = os;
}
byte version;
public Object unmarshal(InputStream is) throws IOException {
@ -211,6 +220,8 @@ public class JavaBinCodec {
return readIterator(dis);
case END:
return END_OBJ;
case SOLRINPUTDOC:
return readSolrInputDocument(dis);
}
throw new RuntimeException("Unknown type " + tagByte);
@ -250,6 +261,10 @@ public class JavaBinCodec {
}
return true;
}
if (val instanceof SolrInputDocument) {
writeSolrInputDocument((SolrInputDocument)val);
return true;
}
if (val instanceof Map) {
writeMap((Map) val);
return true;
@ -340,6 +355,40 @@ public class JavaBinCodec {
writeArray(docs);
}
public SolrInputDocument readSolrInputDocument(FastInputStream dis) throws IOException {
int sz = readVInt(dis);
float docBoost = (Float)readVal(dis);
SolrInputDocument sdoc = new SolrInputDocument();
sdoc.setDocumentBoost(docBoost);
for (int i = 0; i < sz; i++) {
float boost = 1.0f;
String fieldName;
Object boostOrFieldName = readVal(dis);
if (boostOrFieldName instanceof Float) {
boost = (Float)boostOrFieldName;
fieldName = (String)readVal(dis);
} else {
fieldName = (String)boostOrFieldName;
}
Object fieldVal = readVal(dis);
sdoc.setField(fieldName, fieldVal, boost);
}
return sdoc;
}
public void writeSolrInputDocument(SolrInputDocument sdoc) throws IOException {
writeTag(SOLRINPUTDOC, sdoc.size());
writeFloat(sdoc.getDocumentBoost());
for (SolrInputField inputField : sdoc.values()) {
if (inputField.getBoost() != 1.0f) {
writeFloat(inputField.getBoost());
}
writeExternString(inputField.getName());
writeVal(inputField.getValue());
}
}
public Map<Object,Object> readMap(FastInputStream dis)
throws IOException {
int sz = readVInt(dis);
@ -539,6 +588,11 @@ public class JavaBinCodec {
return v;
}
public void writeFloat(float val) throws IOException {
daos.writeByte(FLOAT);
daos.writeFloat(val);
}
public boolean writePrimitive(Object val) throws IOException {
if (val == null) {
daos.writeByte(NULL);
@ -553,8 +607,7 @@ public class JavaBinCodec {
writeLong(((Long) val).longValue());
return true;
} else if (val instanceof Float) {
daos.writeByte(FLOAT);
daos.writeFloat(((Float) val).floatValue());
writeFloat(((Float) val).floatValue());
return true;
} else if (val instanceof Date) {
daos.writeByte(DATE);
@ -579,7 +632,7 @@ public class JavaBinCodec {
} else if (val instanceof byte[]) {
writeByteArray((byte[]) val, 0, ((byte[]) val).length);
return true;
}else if (val instanceof ByteBuffer) {
} else if (val instanceof ByteBuffer) {
ByteBuffer buf = (ByteBuffer) val;
writeByteArray(buf.array(),buf.position(),buf.limit() - buf.position());
return true;