diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5b24460b193..29b227f95e6 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -165,6 +165,11 @@ New Features * SOLR-1032: CSV handler now supports "literal.field_name=value" parameters. (Simon Rosenthal, ehatcher) +* SOLR-2656: realtime-get, efficiently retrieves the latest stored fields for specified + documents, even if they are not yet searchable (i.e. without reopening a searcher) + (yonik) + + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java index 483433dfbb1..3a9257df356 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java +++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java @@ -30,6 +30,7 @@ import org.apache.solr.search.FastLRUCache; import org.apache.solr.search.QParserPlugin; import org.apache.solr.search.ValueSourceParser; import org.apache.solr.update.SolrIndexConfig; +import org.apache.solr.update.UpdateLog; import org.apache.solr.update.processor.UpdateRequestProcessorChain; import org.apache.solr.spelling.QueryConverter; import org.apache.lucene.search.BooleanQuery; @@ -202,6 +203,7 @@ public class SolrConfig extends Config { loadPluginInfo(CodecProviderFactory.class,"mainIndex/codecProviderFactory",false, false); loadPluginInfo(IndexReaderFactory.class,"indexReaderFactory",false, true); loadPluginInfo(UpdateRequestProcessorChain.class,"updateRequestProcessorChain",false, false); + loadPluginInfo(UpdateLog.class,"updateHandler/updateLog",false, false); updateHandlerInfo = loadUpdatehandlerInfo(); diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index fdc9a746327..368a22e8e4f 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -889,6 +889,7 @@ public final class SolrCore implements SolrInfoMBean { addIfNotPresent(components,MoreLikeThisComponent.COMPONENT_NAME,MoreLikeThisComponent.class); addIfNotPresent(components,StatsComponent.COMPONENT_NAME,StatsComponent.class); addIfNotPresent(components,DebugComponent.COMPONENT_NAME,DebugComponent.class); + addIfNotPresent(components,RealTimeGetComponent.COMPONENT_NAME,RealTimeGetComponent.class); return components; } private void addIfNotPresent(Map registry, String name, Class c){ diff --git a/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java new file mode 100644 index 00000000000..901e366f91a --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.handler; + +import org.apache.solr.handler.component.*; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * + * + * All of the following options may be configured for this handler + * in the solrconfig as defaults, and may be overriden as request parameters. + * (TODO: complete documentation of request parameters here, rather than only + * on the wiki). + *

+ * + * + * + */ +public class RealTimeGetHandler extends SearchHandler { + @Override + protected List getDefaultComponents() + { + List names = new ArrayList(1); + names.add(RealTimeGetComponent.COMPONENT_NAME); + return names; + } + + //////////////////////// SolrInfoMBeans methods ////////////////////// + + @Override + public String getVersion() { + return "$Revision$"; + } + + @Override + public String getDescription() { + return "The realtime get handler"; + } + + @Override + public String getSourceId() { + return "$Id$"; + } + + @Override + public String getSource() { + return "$URL$"; + } + + @Override + public URL[] getDocs() { + return null; + } +} + + + + + + + diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java new file mode 100644 index 00000000000..25c4dbe704d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.handler.component; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.Term; +import org.apache.lucene.util.BytesRef; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.FieldType; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.ReturnFields; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.update.DocumentBuilder; +import org.apache.solr.update.UpdateLog; +import org.apache.solr.util.RefCounted; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * TODO! + * + * + * @since solr 1.3 + */ +public class RealTimeGetComponent extends SearchComponent +{ + public static final String COMPONENT_NAME = "get"; + + @Override + public void prepare(ResponseBuilder rb) throws IOException { + // Set field flags + ReturnFields returnFields = new ReturnFields( rb.req ); + rb.rsp.setReturnFields( returnFields ); + } + + + @Override + public void process(ResponseBuilder rb) throws IOException + { + SolrQueryRequest req = rb.req; + SolrQueryResponse rsp = rb.rsp; + SolrParams params = req.getParams(); + + if (!params.getBool(COMPONENT_NAME, true)) { + return; + } + + String id[] = params.getParams("id"); + String ids[] = params.getParams("ids"); + + if (id == null && ids == null) { + return; + } + + String[] allIds = id==null ? new String[0] : id; + + if (ids != null) { + List lst = new ArrayList(); + for (String s : allIds) { + lst.add(s); + } + for (String idList : ids) { + lst.addAll( StrUtils.splitSmart(idList, ",", true) ); + } + allIds = lst.toArray(new String[lst.size()]); + } + + SchemaField idField = req.getSchema().getUniqueKeyField(); + FieldType fieldType = idField.getType(); + + SolrDocumentList docList = new SolrDocumentList(); + UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog(); + + RefCounted searcherHolder = null; + + try { + SolrIndexSearcher searcher = null; + + BytesRef idBytes = new BytesRef(); + for (String idStr : allIds) { + fieldType.readableToIndexed(idStr, idBytes); + if (ulog != null) { + Object o = ulog.lookup(idBytes); + if (o != null) { + // should currently be a List + List entry = (List)o; + assert entry.size() >= 3; + int oper = (Integer)entry.get(0); + switch (oper) { + case UpdateLog.ADD: + docList.add(toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), req.getSchema())); + break; + case UpdateLog.DELETE: + break; + default: + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); + } + continue; + } + } + + // didn't find it in the update log, so it should be in the newest searcher opened + if (searcher == null) { + searcherHolder = req.getCore().getNewestSearcher(false); + searcher = searcherHolder.get(); + } + + int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes)); + if (docid < 0) continue; + Document luceneDocument = searcher.doc(docid); + docList.add(toSolrDoc(luceneDocument, req.getSchema())); + } + + } finally { + if (searcherHolder != null) { + searcherHolder.decref(); + } + } + + + // if the client specified a single id=foo, then use "doc":{ + // otherwise use a standard doclist + + if (ids == null && allIds.length <= 1) { + // if the doc was not found, then use a value of null. + rsp.add("doc", docList.size() > 0 ? docList.get(0) : null); + } else { + docList.setNumFound(docList.size()); + rsp.add("response", docList); + } + + } + + private static SolrDocument toSolrDoc(Document doc, IndexSchema schema) { + SolrDocument out = new SolrDocument(); + for( IndexableField f : doc.getFields() ) { + // Make sure multivalued fields are represented as lists + Object existing = out.get(f.name()); + if (existing == null) { + SchemaField sf = schema.getFieldOrNull(f.name()); + if (sf != null && sf.multiValued()) { + List vals = new ArrayList(); + vals.add( f ); + out.setField( f.name(), vals ); + } + else{ + out.setField( f.name(), f ); + } + } + else { + out.addField( f.name(), f ); + } + } + return out; + } + + private static SolrDocument toSolrDoc(SolrInputDocument sdoc, IndexSchema schema) { + // TODO: do something more performant than this double conversion + Document doc = DocumentBuilder.toDocument(sdoc, schema); + List fields = doc.getFields(); + + // copy the stored fields only + Document out = new Document(); + for (IndexableField f : doc.getFields()) { + if (f.stored()) { + out.add(f); + } + } + + return toSolrDoc(out, schema); + } + + + + //////////////////////////////////////////// + /// SolrInfoMBean + //////////////////////////////////////////// + + @Override + public String getDescription() { + return "query"; + } + + @Override + public String getVersion() { + return "$Revision$"; + } + + @Override + public String getSourceId() { + return "$Id$"; + } + + @Override + public String getSource() { + return "$URL$"; + } + + @Override + public URL[] getDocs() { + return null; + } +} diff --git a/solr/core/src/java/org/apache/solr/search/QueryResultKey.java b/solr/core/src/java/org/apache/solr/search/QueryResultKey.java index 679d3902d73..3aa743a6fb2 100644 --- a/solr/core/src/java/org/apache/solr/search/QueryResultKey.java +++ b/solr/core/src/java/org/apache/solr/search/QueryResultKey.java @@ -46,7 +46,10 @@ public final class QueryResultKey { int h = query.hashCode(); - if (filters != null) h ^= filters.hashCode(); + if (filters != null) { + for (Query filt : filters) + h += filters.hashCode(); + } sfields = (this.sort !=null) ? this.sort.getSort() : defaultSort; for (SortField sf : sfields) { diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java index d28cc7ad7b1..7ff7789333b 100644 --- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.net.URL; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; @@ -53,6 +56,7 @@ import org.apache.solr.search.SolrIndexSearcher; */ public class DirectUpdateHandler2 extends UpdateHandler { protected SolrCoreState indexWriterProvider; + protected final Lock commitLock = new ReentrantLock(); // stats AtomicLong addCommands = new AtomicLong(); @@ -110,6 +114,8 @@ public class DirectUpdateHandler2 extends UpdateHandler { int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime; // getInt("updateHandler/autoSoftCommit/maxTime", -1); softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true); + this.ulog = updateHandler.getUpdateLog(); + this.ulog.init(this, core); } private void deleteAll() throws IOException { @@ -163,6 +169,12 @@ public class DirectUpdateHandler2 extends UpdateHandler { writer.addDocument(cmd.getLuceneDocument()); } + // Add to the transaction log *after* successfully adding to the index, if there was no error. + // This ordering ensures that if we log it, it's definitely been added to the the index. + // This also ensures that if a commit sneaks in-between, that we know everything in a particular + // log version was definitely committed. + ulog.add(cmd); + rc = 1; } finally { if (rc!=1) { @@ -185,6 +197,8 @@ public class DirectUpdateHandler2 extends UpdateHandler { indexWriterProvider.getIndexWriter(core).deleteDocuments(new Term(idField.getName(), cmd.getIndexedId())); + ulog.delete(cmd); + if (commitTracker.getTimeUpperBound() > 0) { commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound()); } @@ -216,7 +230,9 @@ public class DirectUpdateHandler2 extends UpdateHandler { } else { indexWriterProvider.getIndexWriter(core).deleteDocuments(q); } - + + ulog.deleteByQuery(cmd); + madeIt = true; if (commitTracker.getTimeUpperBound() > 0) { @@ -278,6 +294,11 @@ public class DirectUpdateHandler2 extends UpdateHandler { boolean error=true; try { + // only allow one hard commit to proceed at once + if (!cmd.softCommit) { + commitLock.lock(); + } + log.info("start "+cmd); if (cmd.optimize) { @@ -287,6 +308,10 @@ public class DirectUpdateHandler2 extends UpdateHandler { } if (!cmd.softCommit) { + synchronized (this) { // sync is currently needed to prevent preCommit from being called between preSoft and postSoft... see postSoft comments. + ulog.preCommit(cmd); + } + writer.commit(); numDocsPending.set(0); callPostCommitCallbacks(); @@ -300,13 +325,23 @@ public class DirectUpdateHandler2 extends UpdateHandler { } - synchronized (this) { if (cmd.softCommit) { - core.getSearcher(true,false,waitSearcher, true); + // ulog.preSoftCommit(); + synchronized (this) { + ulog.preSoftCommit(cmd); + core.getSearcher(true,false,waitSearcher, true); + ulog.postSoftCommit(cmd); + } + // ulog.postSoftCommit(); } else { - core.getSearcher(true,false,waitSearcher); + synchronized (this) { + ulog.preSoftCommit(cmd); + core.getSearcher(true,false,waitSearcher); + ulog.postSoftCommit(cmd); + } + ulog.postCommit(cmd); // postCommit currently means new searcher has also been opened } - } + // reset commit tracking @@ -321,6 +356,10 @@ public class DirectUpdateHandler2 extends UpdateHandler { error=false; } finally { + if (!cmd.softCommit) { + commitLock.unlock(); + } + addCommands.set(0); deleteByIdCommands.set(0); deleteByQueryCommands.set(0); @@ -396,6 +435,10 @@ public class DirectUpdateHandler2 extends UpdateHandler { } } + @Override + public UpdateLog getUpdateLog() { + return ulog; + } @Override public void close() throws IOException { diff --git a/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java b/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java new file mode 100644 index 00000000000..630c73c3167 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java @@ -0,0 +1,595 @@ +package org.apache.solr.update; + +import org.apache.lucene.util.BytesRef; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.FastInputStream; +import org.apache.solr.common.util.FastOutputStream; +import org.apache.solr.common.util.JavaBinCodec; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.PluginInfo; +import org.apache.solr.core.SolrCore; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +class NullUpdateLog extends UpdateLog { + @Override + public void init(PluginInfo info) { + } + + @Override + public void init(UpdateHandler uhandler, SolrCore core) { + } + + @Override + public void add(AddUpdateCommand cmd) { + } + + @Override + public void delete(DeleteUpdateCommand cmd) { + } + + @Override + public void deleteByQuery(DeleteUpdateCommand cmd) { + } + + @Override + public void preCommit(CommitUpdateCommand cmd) { + } + + @Override + public void postCommit(CommitUpdateCommand cmd) { + } + + @Override + public void preSoftCommit(CommitUpdateCommand cmd) { + } + + @Override + public void postSoftCommit(CommitUpdateCommand cmd) { + } + + @Override + public Object lookup(BytesRef indexedId) { + return null; + } + + @Override + public void close() { + } +} + +public class FSUpdateLog extends UpdateLog { + + public static String TLOG_NAME="tlog"; + + long id = -1; + + private TransactionLog tlog; + private TransactionLog prevTlog; + + private Map map = new HashMap(); + private Map prevMap; // used while committing/reopening is happening + private Map prevMap2; // used while committing/reopening is happening + private TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap + private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap + + private String[] tlogFiles; + private File tlogDir; + private Collection globalStrings; + + private String dataDir; + private String lastDataDir; + + @Override + public void init(PluginInfo info) { + dataDir = (String)info.initArgs.get("dir"); + } + + public void init(UpdateHandler uhandler, SolrCore core) { + if (dataDir == null || dataDir.length()==0) { + dataDir = core.getDataDir(); + } + + if (dataDir.equals(lastDataDir)) { + // on a normal reopen, we currently shouldn't have to do anything + return; + } + lastDataDir = dataDir; + tlogDir = new File(dataDir, TLOG_NAME); + tlogDir.mkdirs(); + tlogFiles = getLogList(tlogDir); + id = getLastLogId() + 1; // add 1 since we will create a new log for the next update + } + + static class LogPtr { + final long pointer; + public LogPtr(long pointer) { + this.pointer = pointer; + } + + public String toString() { + return "LogPtr(" + pointer + ")"; + } + } + + public static String[] getLogList(File directory) { + final String prefix = TLOG_NAME+'.'; + String[] names = directory.list(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith(prefix); + } + }); + Arrays.sort(names); + return names; + } + + + public long getLastLogId() { + if (id != -1) return id; + if (tlogFiles.length == 0) return -1; + String last = tlogFiles[tlogFiles.length-1]; + return Long.parseLong(last.substring(TLOG_NAME.length()+1)); + } + + + @Override + public void add(AddUpdateCommand cmd) { + synchronized (this) { + ensureLog(); + long pos = tlog.write(cmd); + LogPtr ptr = new LogPtr(pos); + map.put(cmd.getIndexedId(), ptr); + // System.out.println("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); + } + } + + @Override + public void delete(DeleteUpdateCommand cmd) { + BytesRef br = cmd.getIndexedId(); + + synchronized (this) { + ensureLog(); + long pos = tlog.writeDelete(cmd); + LogPtr ptr = new LogPtr(pos); + map.put(br, ptr); + // System.out.println("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); + } + } + + @Override + public void deleteByQuery(DeleteUpdateCommand cmd) { + synchronized (this) { + ensureLog(); + // TODO: how to support realtime-get, optimistic concurrency, or anything else in this case? + // Maybe we shouldn't? + // realtime-get could just do a reopen of the searcher + // optimistic concurrency? Maybe we shouldn't support deleteByQuery w/ optimistic concurrency + long pos = tlog.writeDeleteByQuery(cmd); + LogPtr ptr = new LogPtr(pos); + // System.out.println("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); + } + } + + + private void newMap() { + prevMap2 = prevMap; + prevMapLog2 = prevMapLog; + + prevMap = map; + prevMapLog = tlog; + + map = new HashMap(); + } + + private void clearOldMaps() { + prevMap = null; + prevMap2 = null; + } + + @Override + public void preCommit(CommitUpdateCommand cmd) { + synchronized (this) { + // since we're changing the log, we must change the map. + newMap(); + + // since document additions can happen concurrently with commit, create + // a new transaction log first so that we know the old one is definitely + // in the index. + prevTlog = tlog; + tlog = null; + id++; + + if (prevTlog != null) { + globalStrings = prevTlog.getGlobalStrings(); + } + } + } + + @Override + public void postCommit(CommitUpdateCommand cmd) { + synchronized (this) { + if (prevTlog != null) { + prevTlog.decref(); + prevTlog = null; + } + } + } + + @Override + public void preSoftCommit(CommitUpdateCommand cmd) { + synchronized (this) { + if (!cmd.softCommit) return; // already handled this at the start of the hard commit + newMap(); + + // start adding documents to a new map since we won't know if + // any added documents will make it into this commit or not. + // But we do know that any updates already added will definitely + // show up in the latest reader after the commit succeeds. + map = new HashMap(); + // System.out.println("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map)); + } + } + + @Override + public void postSoftCommit(CommitUpdateCommand cmd) { + synchronized (this) { + // We can clear out all old maps now that a new searcher has been opened. + // This currently only works since DUH2 synchronizes around preCommit to avoid + // it being called in the middle of a preSoftCommit, postSoftCommit sequence. + // If this DUH2 synchronization were to be removed, preSoftCommit should + // record what old maps were created and only remove those. + clearOldMaps(); + // System.out.println("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap)); + } + } + + @Override + public Object lookup(BytesRef indexedId) { + LogPtr entry; + TransactionLog lookupLog; + + synchronized (this) { + entry = map.get(indexedId); + lookupLog = tlog; // something found in "map" will always be in "tlog" + // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " + System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog); + if (entry == null && prevMap != null) { + entry = prevMap.get(indexedId); + // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog) + lookupLog = prevMapLog; + // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog); + } + if (entry == null && prevMap2 != null) { + entry = prevMap2.get(indexedId); + // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog) + lookupLog = prevMapLog2; + // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog); + } + + if (entry == null) { + return null; + } + lookupLog.incref(); + } + + try { + // now do the lookup outside of the sync block for concurrency + return lookupLog.lookup(entry.pointer); + } finally { + lookupLog.decref(); + } + + } + + + private void ensureLog() { + if (tlog == null) { + String newLogName = String.format("%s.%019d", TLOG_NAME, id); + tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings); + } + } + + @Override + public void close() { + synchronized (this) { + if (prevTlog != null) { + prevTlog.decref(); + } + if (tlog != null) { + tlog.decref(); + } + } + } + + +} + + +/** + * Log Format: List{Operation, Version, ...} + * ADD, VERSION, DOC + * DELETE, VERSION, ID_BYTES + * DELETE_BY_QUERY, VERSION, String + * + * TODO: keep two files, one for [operation, version, id] and the other for the actual + * document data. That way we could throw away document log files more readily + * while retaining the smaller operation log files longer (and we can retrieve + * the stored fields from the latest documents from the index). + * + * This would require keeping all source fields stored of course. + * + * This would also allow to not log document data for requests with commit=true + * in them (since we know that if the request succeeds, all docs will be committed) + * + */ +class TransactionLog { + + long id; + File tlogFile; + RandomAccessFile raf; + FileChannel channel; + OutputStream os; + FastOutputStream fos; + InputStream is; + long start; + + volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery) + + AtomicInteger refcount = new AtomicInteger(1); + Map globalStringMap = new HashMap(); + List globalStringList = new ArrayList(); + + // write a BytesRef as a byte array + JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() { + @Override + public Object resolve(Object o, JavaBinCodec codec) throws IOException { + if (o instanceof BytesRef) { + BytesRef br = (BytesRef)o; + codec.writeByteArray(br.bytes, br.offset, br.length); + return null; + } + return o; + } + }; + + public class LogCodec extends JavaBinCodec { + public LogCodec() { + super(resolver); + } + + @Override + public void writeExternString(String s) throws IOException { + if (s == null) { + writeTag(NULL); + return; + } + + // no need to synchronize globalStringMap - it's only updated before the first record is written to the log + Integer idx = globalStringMap.get(s); + if (idx == null) { + // write a normal string + writeStr(s); + } else { + // write the extern string + writeTag(EXTERN_STRING, idx); + } + } + + @Override + public String readExternString(FastInputStream fis) throws IOException { + int idx = readSize(fis); + if (idx != 0) {// idx != 0 is the index of the extern string + // no need to synchronize globalStringList - it's only updated before the first record is written to the log + return globalStringList.get(idx - 1); + } else {// idx == 0 means it has a string value + // this shouldn't happen with this codec subclass. + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log"); + } + } + + + } + + public long writeData(Object o) { + LogCodec codec = new LogCodec(); + try { + long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position() + codec.init(fos); + codec.writeVal(o); + return pos; + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + TransactionLog(File tlogFile, Collection globalStrings) { + try { + this.tlogFile = tlogFile; + raf = new RandomAccessFile(this.tlogFile, "rw"); + start = raf.length(); + channel = raf.getChannel(); + os = Channels.newOutputStream(channel); + fos = FastOutputStream.wrap(os); + addGlobalStrings(globalStrings); + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + private void addGlobalStrings(Collection strings) { + if (strings == null) return; + int origSize = globalStringMap.size(); + for (String s : strings) { + Integer idx = null; + if (origSize > 0) { + idx = globalStringMap.get(s); + } + if (idx != null) continue; // already in list + globalStringList.add(s); + globalStringMap.put(s, globalStringList.size()); + } + assert globalStringMap.size() == globalStringList.size(); + } + + Collection getGlobalStrings() { + synchronized (fos) { + return new ArrayList(globalStringList); + } + } + + private void writeLogHeader(LogCodec codec) throws IOException { + NamedList header = new NamedList(); + header.add("SOLR_TLOG",1); // a magic string + version number? + header.add("strings",globalStringList); + codec.marshal(header, fos); + } + + + public long write(AddUpdateCommand cmd) { + LogCodec codec = new LogCodec(); + synchronized (fos) { + try { + long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position() + SolrInputDocument sdoc = cmd.getSolrInputDocument(); + + if (pos == 0) { // TODO: needs to be changed if we start writing a header first + addGlobalStrings(sdoc.getFieldNames()); + pos = start + fos.size(); + } + + codec.init(fos); + codec.writeTag(JavaBinCodec.ARR, 3); + codec.writeInt(UpdateLog.ADD); // should just take one byte + codec.writeLong(0); // the version... should also just be one byte if 0 + codec.writeSolrInputDocument(cmd.getSolrInputDocument()); + // fos.flushBuffer(); // flush later + return pos; + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + } + + public long writeDelete(DeleteUpdateCommand cmd) { + LogCodec codec = new LogCodec(); + synchronized (fos) { + try { + long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position() + if (pos == 0) { + writeLogHeader(codec); + pos = start + fos.size(); + } + codec.init(fos); + codec.writeTag(JavaBinCodec.ARR, 3); + codec.writeInt(UpdateLog.DELETE); // should just take one byte + codec.writeLong(0); // the version... should also just be one byte if 0 + BytesRef br = cmd.getIndexedId(); + codec.writeByteArray(br.bytes, br.offset, br.length); + // fos.flushBuffer(); // flush later + return pos; + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + } + + public long writeDeleteByQuery(DeleteUpdateCommand cmd) { + LogCodec codec = new LogCodec(); + synchronized (fos) { + try { + long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position() + if (pos == 0) { + writeLogHeader(codec); + pos = start + fos.size(); + } + codec.init(fos); + codec.writeTag(JavaBinCodec.ARR, 3); + codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte + codec.writeLong(0); // the version... should also just be one byte if 0 + codec.writeStr(cmd.query); + // fos.flushBuffer(); // flush later + return pos; + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + } + + /* This method is thread safe */ + public Object lookup(long pos) { + try { + // make sure any unflushed buffer has been flushed + synchronized (fos) { + // TODO: optimize this by keeping track of what we have flushed up to + fos.flushBuffer(); + } + + ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos); + LogCodec codec = new LogCodec(); + return codec.readVal(fis); + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + public void incref() { + refcount.incrementAndGet(); + } + + public void decref() { + if (refcount.decrementAndGet() == 0) { + close(); + } + } + + + private void close() { + try { + fos.flush(); + fos.close(); + if (deleteOnClose) { + tlogFile.delete(); + } + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + public String toString() { + return tlogFile.toString(); + } + +} + + +class ChannelFastInputStream extends FastInputStream { + FileChannel ch; + long chPosition; + + public ChannelFastInputStream(FileChannel ch, long chPosition) { + super(null); + this.ch = ch; + this.chPosition = chPosition; + } + + @Override + public int readWrappedStream(byte[] target, int offset, int len) throws IOException { + ByteBuffer bb = ByteBuffer.wrap(target, offset, len); + int ret = ch.read(bb, chPosition); + if (ret >= 0) { + chPosition += ret; + } + return ret; + } + + @Override + public void close() throws IOException { + ch.close(); + } +} + diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java index 318696b4612..61aaad4a509 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java @@ -19,6 +19,7 @@ package org.apache.solr.update; import java.io.IOException; +import java.util.List; import java.util.Vector; import org.apache.solr.core.PluginInfo; @@ -54,6 +55,8 @@ public abstract class UpdateHandler implements SolrInfoMBean { protected Vector softCommitCallbacks = new Vector(); protected Vector optimizeCallbacks = new Vector(); + protected UpdateLog ulog; + /** * Called when a SolrCore using this UpdateHandler is closed. */ @@ -81,6 +84,19 @@ public abstract class UpdateHandler implements SolrInfoMBean { } } + + private void initLog() { + PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName()); + if (ulogPluginInfo != null && ulogPluginInfo.isEnabled()) { + ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog"); + } else { + ulog = new NullUpdateLog(); + ulog.init(null); + } + ulog.init(this, core); + } + + protected void callPostCommitCallbacks() { for (SolrEventListener listener : commitCallbacks) { listener.postCommit(); @@ -105,6 +121,7 @@ public abstract class UpdateHandler implements SolrInfoMBean { idField = schema.getUniqueKeyField(); idFieldType = idField!=null ? idField.getType() : null; parseEventListeners(); + initLog(); } /** @@ -133,7 +150,7 @@ public abstract class UpdateHandler implements SolrInfoMBean { public abstract void commit(CommitUpdateCommand cmd) throws IOException; public abstract void rollback(RollbackUpdateCommand cmd) throws IOException; public abstract void close() throws IOException; - + public abstract UpdateLog getUpdateLog(); /** * NOTE: this function is not thread safe. However, it is safe to call within the diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java new file mode 100644 index 00000000000..868ea8cbb1a --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -0,0 +1,22 @@ +package org.apache.solr.update; + +import org.apache.lucene.util.BytesRef; +import org.apache.solr.core.SolrCore; +import org.apache.solr.util.plugin.PluginInfoInitialized; + +public abstract class UpdateLog implements PluginInfoInitialized { + public static final int ADD = 0x00; + public static final int DELETE = 0x01; + public static final int DELETE_BY_QUERY = 0x02; + + public abstract void init(UpdateHandler uhandler, SolrCore core); + public abstract void add(AddUpdateCommand cmd); + public abstract void delete(DeleteUpdateCommand cmd); + public abstract void deleteByQuery(DeleteUpdateCommand cmd); + public abstract void preCommit(CommitUpdateCommand cmd); + public abstract void postCommit(CommitUpdateCommand cmd); + public abstract void preSoftCommit(CommitUpdateCommand cmd); + public abstract void postSoftCommit(CommitUpdateCommand cmd); + public abstract Object lookup(BytesRef indexedId); + public abstract void close(); +} diff --git a/solr/core/src/test-files/solr/conf/solrconfig-tlog.xml b/solr/core/src/test-files/solr/conf/solrconfig-tlog.xml new file mode 100644 index 00000000000..2c4e923a97d --- /dev/null +++ b/solr/core/src/test-files/solr/conf/solrconfig-tlog.xml @@ -0,0 +1,38 @@ + + + + + + ${tests.luceneMatchVersion:LUCENE_CURRENT} + + + + + + + true + + + + + + + + + + diff --git a/solr/core/src/test-files/solr/conf/solrconfig.xml b/solr/core/src/test-files/solr/conf/solrconfig.xml index 64b5a4216e4..1c85ed80c73 100644 --- a/solr/core/src/test-files/solr/conf/solrconfig.xml +++ b/solr/core/src/test-files/solr/conf/solrconfig.xml @@ -252,6 +252,13 @@ true + + + true + + + + dismax diff --git a/solr/core/src/test/org/apache/solr/MinimalSchemaTest.java b/solr/core/src/test/org/apache/solr/MinimalSchemaTest.java index 7eca3e4523a..5b55e45fe34 100644 --- a/solr/core/src/test/org/apache/solr/MinimalSchemaTest.java +++ b/solr/core/src/test/org/apache/solr/MinimalSchemaTest.java @@ -116,8 +116,10 @@ public class MinimalSchemaTest extends SolrTestCaseJ4 { assertQ("failure w/handler: '" + handler + "'", req("qt", handler, - // this should be fairly innoculous for any type of query - "q", "foo:bar") + // this should be fairly innocuous for any type of query + "q", "foo:bar", + "omitHeader", "false" + ) ,"//lst[@name='responseHeader']" ); } catch (Exception e) { diff --git a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java index b5eeadd4c83..f9a4bdadc76 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java +++ b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java @@ -16,25 +16,80 @@ */ package org.apache.solr.search; + import org.apache.noggit.ObjectBuilder; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.SolrException; import org.apache.solr.request.SolrQueryRequest; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.Ignore; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class TestRealTimeGet extends SolrTestCaseJ4 { @BeforeClass public static void beforeClass() throws Exception { - initCore("solrconfig.xml","schema12.xml"); + initCore("solrconfig-tlog.xml","schema12.xml"); } + @Test + public void testGetRealtime() throws Exception { + clearIndex(); + assertU(commit()); + + assertU(adoc("id","1")); + assertJQ(req("q","id:1") + ,"/response/numFound==0" + ); + assertJQ(req("qt","/get","id","1") + ,"=={'doc':{'id':'1'}}" + ); + assertJQ(req("qt","/get","ids","1") + ,"=={" + + " 'response':{'numFound':1,'start':0,'docs':[" + + " {" + + " 'id':'1'}]" + + " }}}" + ); + + assertU(commit()); + + assertJQ(req("q","id:1") + ,"/response/numFound==1" + ); + assertJQ(req("qt","/get","id","1") + ,"=={'doc':{'id':'1'}}" + ); + assertJQ(req("qt","/get","ids","1") + ,"=={" + + " 'response':{'numFound':1,'start':0,'docs':[" + + " {" + + " 'id':'1'}]" + + " }}}" + ); + + assertU(delI("1")); + + assertJQ(req("q","id:1") + ,"/response/numFound==1" + ); + assertJQ(req("qt","/get","id","1") + ,"=={'doc':null}" + ); + assertJQ(req("qt","/get","ids","1") + ,"=={'response':{'numFound':0,'start':0,'docs':[]}}" + ); + + } + + /*** @Test public void testGetRealtime() throws Exception { @@ -69,6 +124,19 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { ***/ + public static void verbose(Object... args) { + if (!VERBOSE) return; + StringBuilder sb = new StringBuilder("TEST:"); + sb.append(Thread.currentThread().getName()); + sb.append(':'); + for (Object o : args) { + sb.append(' '); + sb.append(o.toString()); + } + System.out.println(sb.toString()); + } + + final ConcurrentHashMap model = new ConcurrentHashMap(); Map committedModel = new HashMap(); long snapshotCount; @@ -94,20 +162,24 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { @Test public void testStressGetRealtime() throws Exception { - // update variables - final int commitPercent = 10; - final int softCommitPercent = 50; // what percent of the commits are soft - final int deletePercent = 8; - final int deleteByQueryPercent = 4; - final int ndocs = 100; - int nWriteThreads = 10; + clearIndex(); + assertU(commit()); + + final int commitPercent = 5 + random.nextInt(20); + final int softCommitPercent = 30+random.nextInt(60); // what percent of the commits are soft + final int deletePercent = 4+random.nextInt(25); + final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query + final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200)); + int nWriteThreads = 5 + random.nextInt(25); final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers - // query variables - final int percentRealtimeQuery = 0; // realtime get is not implemented yet - final AtomicLong operations = new AtomicLong(atLeast(10000)); // number of query operations to perform in total - int nReadThreads = 10; + // query variables + final int percentRealtimeQuery = 60; + final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total + int nReadThreads = 5 + random.nextInt(25); + + initModel(ndocs); @@ -121,6 +193,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { @Override public void run() { + try { while (operations.get() > 0) { int oper = rand.nextInt(100); @@ -134,14 +207,22 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { version = snapshotCount++; } - if (rand.nextInt(100) < softCommitPercent) + if (rand.nextInt(100) < softCommitPercent) { + verbose("softCommit start"); assertU(h.commit("softCommit","true")); - else + verbose("softCommit end"); + } else { + verbose("commit start"); assertU(commit()); + verbose("commit end"); + } synchronized(TestRealTimeGet.this) { - // install this snapshot only if it's newer than the current one + // install this model snapshot only if it's newer than the current one if (version >= committedModelClock) { + if (VERBOSE) { + verbose("installing new committedModel version="+committedModelClock); + } committedModel = newCommittedModel; committedModelClock = version; } @@ -169,14 +250,37 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { long nextVal = Math.abs(val)+1; if (oper < commitPercent + deletePercent) { + if (VERBOSE) { + verbose("deleting id",id,"val=",nextVal); + } + assertU("" + id + ""); model.put(id, -nextVal); + if (VERBOSE) { + verbose("deleting id", id, "val=",nextVal,"DONE"); + } } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { + if (VERBOSE) { + verbose("deleteByQuery id ",id, "val=",nextVal); + } + assertU("id:" + id + ""); model.put(id, -nextVal); + if (VERBOSE) { + verbose("deleteByQuery id",id, "val=",nextVal,"DONE"); + } } else { + if (VERBOSE) { + verbose("adding id", id, "val=", nextVal); + } + assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal))); model.put(id, nextVal); + + if (VERBOSE) { + verbose("adding id", id, "val=", nextVal,"DONE"); + } + } } @@ -184,6 +288,11 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { lastId = id; } } + } catch (Throwable e) { + operations.set(-1L); + SolrException.log(log, e); + fail(e.getMessage()); + } } }; @@ -199,7 +308,6 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { public void run() { try { while (operations.decrementAndGet() >= 0) { - int oper = rand.nextInt(100); // bias toward a recently changed doc int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); @@ -217,6 +325,9 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { } } + if (VERBOSE) { + verbose("querying id", id); + } SolrQueryRequest sreq; if (realTime) { sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id)); @@ -232,14 +343,17 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { } else { assertEquals(1, doclist.size()); long foundVal = (Long)(((Map)doclist.get(0)).get(field)); - assertTrue(foundVal >= Math.abs(val)); + if (foundVal < Math.abs(val)) { + verbose("ERROR, id=", id, "foundVal=",foundVal,"model val=",val); + assertTrue(foundVal >= Math.abs(val)); + } } } } catch (Throwable e) { operations.set(-1L); - SolrException.log(log,e); - fail(e.toString()); + SolrException.log(log, e); + fail(e.getMessage()); } } }; @@ -255,5 +369,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { for (Thread thread : threads) { thread.join(); } + } + } diff --git a/solr/example/solr/conf/solrconfig.xml b/solr/example/solr/conf/solrconfig.xml index 794392e3397..b08929c777b 100755 --- a/solr/example/solr/conf/solrconfig.xml +++ b/solr/example/solr/conf/solrconfig.xml @@ -343,6 +343,17 @@ MYVAR=val1 --> + + + + + + + + + + true + + + +