Merged /lucene/dev/trunk:r1514712-1515613

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene5178@1515615 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Muir 2013-08-19 21:31:34 +00:00
commit 3b49cf1c86
40 changed files with 1476 additions and 529 deletions

View File

@ -600,23 +600,10 @@
<version>2.4</version>
<configuration>
<archive>
<!-- This section should be *exactly* the same under -->
<!-- maven-jar-plugin and maven-war-plugin. -->
<!-- If you make changes here, make the same changes -->
<!-- in the other location as well. -->
<manifestEntries>
<Extension-Name>${project.groupId}</Extension-Name>
<Implementation-Title>${project.groupId}</Implementation-Title>
<Specification-Title>${project.name}</Specification-Title>
<!-- spec version must match "digit+{.digit+}*" -->
<Specification-Version>${base.specification.version}.${now.version}</Specification-Version>
<Specification-Vendor>The Apache Software Foundation</Specification-Vendor>
<!-- impl version can be any string -->
<Implementation-Version>${project.version} ${svn.revision} - ${user.name} - ${now.timestamp}</Implementation-Version>
<Implementation-Vendor>The Apache Software Foundation</Implementation-Vendor>
<X-Compile-Source-JDK>${java.compat.version}</X-Compile-Source-JDK>
<X-Compile-Target-JDK>${java.compat.version}</X-Compile-Target-JDK>
</manifestEntries>
<manifest>
<addDefaultSpecificationEntries>false</addDefaultSpecificationEntries>
<addDefaultImplementationEntries>false</addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
</plugin>
@ -681,7 +668,7 @@
<configuration>
<archive>
<!-- This section should be *exactly* the same under -->
<!-- maven-jar-plugin and maven-war-plugin. -->
<!-- maven-bundle-plugin and maven-war-plugin. -->
<!-- If you make changes here, make the same changes -->
<!-- in the other location as well. -->
<manifestEntries>
@ -694,6 +681,7 @@
<!-- impl version can be any string -->
<Implementation-Version>${project.version} ${svn.revision} - ${user.name} - ${now.timestamp}</Implementation-Version>
<Implementation-Vendor>The Apache Software Foundation</Implementation-Vendor>
<Implementation-Vendor-Id>${project.groupId}</Implementation-Vendor-Id>
<X-Compile-Source-JDK>${java.compat.version}</X-Compile-Source-JDK>
<X-Compile-Target-JDK>${java.compat.version}</X-Compile-Target-JDK>
</manifestEntries>
@ -806,6 +794,23 @@
<configuration>
<instructions>
<Export-Package>*;-split-package:=merge-first</Export-Package>
<!-- This section should be *exactly* the same under -->
<!-- maven-bundle-plugin and maven-war-plugin. -->
<!-- If you make changes here, make the same changes -->
<!-- in the other location as well. -->
<Extension-Name>${project.groupId}</Extension-Name>
<Implementation-Title>${project.groupId}</Implementation-Title>
<Specification-Title>${project.name}</Specification-Title>
<!-- spec version must match "digit+{.digit+}*" -->
<Specification-Version>${base.specification.version}.${now.version}</Specification-Version>
<Specification-Vendor>The Apache Software Foundation</Specification-Vendor>
<!-- impl version can be any string -->
<Implementation-Version>${project.version} ${svn.revision} - ${user.name} - ${now.timestamp}</Implementation-Version>
<Implementation-Vendor>The Apache Software Foundation</Implementation-Vendor>
<Implementation-Vendor-Id>${project.groupId}</Implementation-Vendor-Id>
<X-Compile-Source-JDK>${java.compat.version}</X-Compile-Source-JDK>
<X-Compile-Target-JDK>${java.compat.version}</X-Compile-Target-JDK>
</instructions>
</configuration>
<executions>

View File

@ -45,7 +45,6 @@ Optimizations
on Windows if NIOFSDirectory is used, mmapped files are still locked.
(Michael Poindexter, Robert Muir, Uwe Schindler)
======================= Lucene 4.5.0 =======================
New features
@ -206,6 +205,13 @@ Optimizations
* LUCENE-5170: Fixed several wrapper analyzers to inherit the reuse strategy
of the wrapped Analyzer. (Uwe Schindler, Robert Muir, Shay Banon)
* LUCENE-5006: Simplified DocumentsWriter and DocumentsWriterPerThread
synchronization and concurrent interaction with IndexWriter. DWPT is now
only setup once and has no reset logic. All segment publishing and state
transition from DWPT into IndexWriter is now done via an Event-Queue
processed from within the IndexWriter in order to prevent suituations
where DWPT or DW calling int IW causing deadlocks. (Simon Willnauer)
Documentation
@ -225,6 +231,11 @@ Changes in backwards compatibility policy
* LUCENE-5170: Changed method signatures of Analyzer.ReuseStrategy to take
Analyzer. Closeable interface was removed because the class was changed to
be stateless. (Uwe Schindler, Robert Muir, Shay Banon)
Build
* SOLR-5159: Manifest includes non-parsed maven variables.
(Artem Karpenko via Steve Rowe)
======================= Lucene 4.4.0 =======================

View File

@ -19,18 +19,18 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.index.FieldInfos.FieldNumbers;
import org.apache.lucene.index.IndexWriter.Event;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
@ -100,19 +100,15 @@ import org.apache.lucene.util.InfoStream;
*/
final class DocumentsWriter {
Directory directory;
private final Directory directory;
private volatile boolean closed;
final InfoStream infoStream;
Similarity similarity;
private final InfoStream infoStream;
List<String> newFiles;
private final LiveIndexWriterConfig config;
final IndexWriter indexWriter;
final LiveIndexWriterConfig indexWriterConfig;
private AtomicInteger numDocsInRAM = new AtomicInteger(0);
private final AtomicInteger numDocsInRAM = new AtomicInteger(0);
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
@ -125,73 +121,72 @@ final class DocumentsWriter {
*/
private volatile boolean pendingChangesInCurrentFullFlush;
private Collection<String> abortedFiles; // List of files that were written before last abort()
final IndexingChain chain;
final DocumentsWriterPerThreadPool perThreadPool;
final FlushPolicy flushPolicy;
final DocumentsWriterFlushControl flushControl;
final Codec codec;
DocumentsWriter(Codec codec, LiveIndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumbers globalFieldNumbers,
BufferedDeletesStream bufferedDeletesStream) {
this.codec = codec;
this.directory = directory;
this.indexWriter = writer;
this.infoStream = config.getInfoStream();
this.similarity = config.getSimilarity();
this.indexWriterConfig = writer.getConfig();
this.perThreadPool = config.getIndexerThreadPool();
this.chain = config.getIndexingChain();
this.perThreadPool.initialize(this, globalFieldNumbers, config);
flushPolicy = config.getFlushPolicy();
assert flushPolicy != null;
flushPolicy.init(this);
flushControl = new DocumentsWriterFlushControl(this, config);
}
private final IndexWriter writer;
private final Queue<Event> events;
synchronized void deleteQueries(final Query... queries) throws IOException {
DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directory) {
this.directory = directory;
this.config = config;
this.infoStream = config.getInfoStream();
this.perThreadPool = config.getIndexerThreadPool();
flushPolicy = config.getFlushPolicy();
this.writer = writer;
this.events = new ConcurrentLinkedQueue<Event>();
flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedDeletesStream);
}
synchronized boolean deleteQueries(final Query... queries) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries);
flushControl.doOnDelete();
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
return applyAllDeletes(deleteQueue);
}
// TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map)
synchronized void deleteTerms(final Term... terms) throws IOException {
synchronized boolean deleteTerms(final Term... terms) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms);
flushControl.doOnDelete();
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
return applyAllDeletes( deleteQueue);
}
DocumentsWriterDeleteQueue currentDeleteSession() {
return deleteQueue;
}
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null && !flushControl.isFullFlush()) {
ticketQueue.addDeletesAndPurge(this, deleteQueue);
private final boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (flushControl.doApplyAllDeletes()) {
if (deleteQueue != null && !flushControl.isFullFlush()) {
ticketQueue.addDeletes(deleteQueue);
}
putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
return true;
}
indexWriter.applyAllDeletes();
indexWriter.flushCount.incrementAndGet();
return false;
}
final int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
if (forced) {
return ticketQueue.forcePurge(writer);
} else {
return ticketQueue.tryPurge(writer);
}
}
/** Returns how many docs are currently buffered in RAM. */
int getNumDocs() {
return numDocsInRAM.get();
}
Collection<String> abortedFiles() {
return abortedFiles;
}
private void ensureOpen() throws AlreadyClosedException {
if (closed) {
throw new AlreadyClosedException("this IndexWriter is closed");
@ -202,45 +197,37 @@ final class DocumentsWriter {
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
synchronized void abort() {
synchronized void abort(IndexWriter writer) {
assert !Thread.holdsLock(writer) : "IndexWriter lock should never be hold when aborting";
boolean success = false;
final Set<String> newFilesSet = new HashSet<String>();
try {
deleteQueue.clear();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "abort");
}
final int limit = perThreadPool.getActiveThreadState();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
try {
if (perThread.isActive()) { // we might be closed
try {
perThread.dwpt.abort();
} finally {
perThread.dwpt.checkAndResetHasAborted();
flushControl.doOnAbort(perThread);
}
} else {
assert closed;
}
abortThreadState(perThread, newFilesSet);
} finally {
perThread.unlock();
}
}
flushControl.abortPendingFlushes();
flushControl.abortPendingFlushes(newFilesSet);
putEvent(new DeleteNewFilesEvent(newFilesSet));
flushControl.waitForFlush();
success = true;
} finally {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "done abort; abortedFiles=" + abortedFiles + " success=" + success);
infoStream.message("DW", "done abort; abortedFiles=" + newFilesSet + " success=" + success);
}
}
}
synchronized void lockAndAbortAll() {
synchronized void lockAndAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "lockAndAbortAll");
@ -249,20 +236,15 @@ final class DocumentsWriter {
try {
deleteQueue.clear();
final int limit = perThreadPool.getMaxThreadStates();
final Set<String> newFilesSet = new HashSet<String>();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
if (perThread.isActive()) { // we might be closed or
try {
perThread.dwpt.abort();
} finally {
perThread.dwpt.checkAndResetHasAborted();
flushControl.doOnAbort(perThread);
}
}
abortThreadState(perThread, newFilesSet);
}
deleteQueue.clear();
flushControl.abortPendingFlushes();
flushControl.abortPendingFlushes(newFilesSet);
putEvent(new DeleteNewFilesEvent(newFilesSet));
flushControl.waitForFlush();
success = true;
} finally {
@ -271,12 +253,31 @@ final class DocumentsWriter {
}
if (!success) {
// if something happens here we unlock all states again
unlockAllAfterAbortAll();
unlockAllAfterAbortAll(indexWriter);
}
}
}
private final void abortThreadState(final ThreadState perThread, Set<String> newFiles) {
assert perThread.isHeldByCurrentThread();
if (perThread.isActive()) { // we might be closed
if (perThread.isInitialized()) {
try {
subtractFlushedNumDocs(perThread.dwpt.getNumDocsInRAM());
perThread.dwpt.abort(newFiles);
} finally {
perThread.dwpt.checkAndResetHasAborted();
flushControl.doOnAbort(perThread);
}
} else {
flushControl.doOnAbort(perThread);
}
} else {
assert closed;
}
}
final synchronized void unlockAllAfterAbortAll() {
final synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAll");
@ -334,7 +335,7 @@ final class DocumentsWriter {
private boolean preUpdate() throws IOException {
ensureOpen();
boolean maybeMerge = false;
boolean hasEvents = false;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream.isEnabled("DW")) {
@ -345,7 +346,7 @@ final class DocumentsWriter {
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
maybeMerge |= doFlush(flushingDWPT);
hasEvents |= doFlush(flushingDWPT);
}
if (infoStream.isEnabled("DW")) {
@ -361,28 +362,35 @@ final class DocumentsWriter {
infoStream.message("DW", "continue indexing after helping out flushing DocumentsWriter is healthy");
}
}
return maybeMerge;
return hasEvents;
}
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException {
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException {
hasEvents |= applyAllDeletes(deleteQueue);
if (flushingDWPT != null) {
maybeMerge |= doFlush(flushingDWPT);
hasEvents |= doFlush(flushingDWPT);
} else {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
maybeMerge |= doFlush(nextPendingFlush);
hasEvents |= doFlush(nextPendingFlush);
}
}
return maybeMerge;
return hasEvents;
}
private final void ensureInitialized(ThreadState state) {
if (state.isActive() && state.dwpt == null) {
final FieldInfos.Builder infos = new FieldInfos.Builder(
writer.globalFieldNumberMap);
state.dwpt = new DocumentsWriterPerThread(writer.newSegmentName(),
directory, config, infoStream, deleteQueue, infos);
}
}
boolean updateDocuments(final Iterable<? extends IndexDocument> docs, final Analyzer analyzer,
final Term delTerm) throws IOException {
boolean maybeMerge = preUpdate();
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
@ -392,13 +400,19 @@ final class DocumentsWriter {
ensureOpen();
assert false: "perThread is not active but we are still open";
}
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
numDocsInRAM.addAndGet(docCount);
} finally {
if (dwpt.checkAndResetHasAborted()) {
if (!dwpt.pendingFilesToDelete().isEmpty()) {
putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
}
subtractFlushedNumDocs(dwptNumDocs);
flushControl.doOnAbort(perThread);
}
}
@ -408,31 +422,35 @@ final class DocumentsWriter {
perThread.unlock();
}
return postUpdate(flushingDWPT, maybeMerge);
return postUpdate(flushingDWPT, hasEvents);
}
boolean updateDocument(final IndexDocument doc, final Analyzer analyzer,
final Term delTerm) throws IOException {
boolean maybeMerge = preUpdate();
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
try {
if (!perThread.isActive()) {
ensureOpen();
throw new IllegalStateException("perThread is not active but we are still open");
assert false: "perThread is not active but we are still open";
}
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
dwpt.updateDocument(doc, analyzer, delTerm);
numDocsInRAM.incrementAndGet();
} finally {
if (dwpt.checkAndResetHasAborted()) {
if (!dwpt.pendingFilesToDelete().isEmpty()) {
putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
}
subtractFlushedNumDocs(dwptNumDocs);
flushControl.doOnAbort(perThread);
}
}
@ -442,13 +460,13 @@ final class DocumentsWriter {
perThread.unlock();
}
return postUpdate(flushingDWPT, maybeMerge);
return postUpdate(flushingDWPT, hasEvents);
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean maybeMerge = false;
boolean hasEvents = false;
while (flushingDWPT != null) {
maybeMerge = true;
hasEvents = true;
boolean success = false;
SegmentFlushTicket ticket = null;
try {
@ -474,9 +492,24 @@ final class DocumentsWriter {
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush();
ticketQueue.addSegment(ticket, newSegment);
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush();
ticketQueue.addSegment(ticket, newSegment);
dwptSuccess = true;
} finally {
subtractFlushedNumDocs(flushingDocsInRam);
if (!flushingDWPT.pendingFilesToDelete().isEmpty()) {
putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
hasEvents = true;
}
if (!dwptSuccess) {
putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
hasEvents = true;
}
}
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
success = true;
} finally {
@ -496,54 +529,38 @@ final class DocumentsWriter {
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers.
ticketQueue.forcePurge(this);
} else {
ticketQueue.tryPurge(this);
putEvent(ForcedPurgeEvent.INSTANCE);
break;
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
indexWriter.flushCount.incrementAndGet();
indexWriter.doAfterFlush();
}
flushingDWPT = flushControl.nextPendingFlush();
}
if (hasEvents) {
putEvent(MergePendingEvent.INSTANCE);
}
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB));
}
applyAllDeletes(deleteQueue);
hasEvents = true;
if (!this.applyAllDeletes(deleteQueue)) {
putEvent(ApplyDeletesEvent.INSTANCE);
}
}
return maybeMerge;
return hasEvents;
}
void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
assert bufferedDeletes != null;
if (bufferedDeletes != null && bufferedDeletes.any()) {
indexWriter.publishFrozenDeletes(bufferedDeletes);
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
}
}
} else {
publishFlushedSegment(newSegment, bufferedDeletes);
}
}
final void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
@ -551,29 +568,6 @@ final class DocumentsWriter {
}
}
/**
* Publishes the flushed segment, segment private deletes (if any) and its
* associated global delete (if present) to IndexWriter. The actual
* publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
* delete generation is always GlobalPacket_deleteGeneration + 1
*/
private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
throws IOException {
assert newSegment != null;
assert newSegment.segmentInfo != null;
final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
//System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);
}
if (segmentDeletes != null && infoStream.isEnabled("DW")) {
infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
}
// now publish!
indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
}
// for asserts
private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
@ -588,7 +582,7 @@ final class DocumentsWriter {
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
final boolean flushAllThreads()
final boolean flushAllThreads(final IndexWriter indexWriter)
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
@ -620,10 +614,9 @@ final class DocumentsWriter {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue);
} else {
ticketQueue.forcePurge(this);
}
ticketQueue.addDeletes(flushingDeleteQueue);
}
ticketQueue.forcePurge(indexWriter);
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
@ -641,11 +634,94 @@ final class DocumentsWriter {
// Release the flush lock
flushControl.finishFullFlush();
} else {
flushControl.abortFullFlushes();
Set<String> newFilesSet = new HashSet<>();
flushControl.abortFullFlushes(newFilesSet);
putEvent(new DeleteNewFilesEvent(newFilesSet));
}
} finally {
pendingChangesInCurrentFullFlush = false;
}
}
public LiveIndexWriterConfig getIndexWriterConfig() {
return config;
}
private void putEvent(Event event) {
events.add(event);
}
static final class ApplyDeletesEvent implements Event {
static final Event INSTANCE = new ApplyDeletesEvent();
private int instCount = 0;
private ApplyDeletesEvent() {
assert instCount == 0;
instCount++;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.applyDeletesAndPurge(true); // we always purge!
}
}
static final class MergePendingEvent implements Event {
static final Event INSTANCE = new MergePendingEvent();
private int instCount = 0;
private MergePendingEvent() {
assert instCount == 0;
instCount++;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.doAfterSegmentFlushed(triggerMerge, forcePurge);
}
}
static final class ForcedPurgeEvent implements Event {
static final Event INSTANCE = new ForcedPurgeEvent();
private int instCount = 0;
private ForcedPurgeEvent() {
assert instCount == 0;
instCount++;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.purge(true);
}
}
static class FlushFailedEvent implements Event {
private final SegmentInfo info;
public FlushFailedEvent(SegmentInfo info) {
this.info = info;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.flushFailed(info);
}
}
static class DeleteNewFilesEvent implements Event {
private final Collection<String> files;
public DeleteNewFilesEvent(Collection<String> files) {
this.files = files;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.deleteNewFiles(files);
}
}
public Queue<Event> eventQueue() {
return events;
}
}

View File

@ -23,9 +23,11 @@ import java.util.List;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
/**
@ -66,14 +68,18 @@ final class DocumentsWriterFlushControl {
private boolean closed = false;
private final DocumentsWriter documentsWriter;
private final LiveIndexWriterConfig config;
private final BufferedDeletesStream bufferedDeletesStream;
private final InfoStream infoStream;
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedDeletesStream bufferedDeletesStream) {
this.infoStream = config.getInfoStream();
this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.config = config;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.documentsWriter = documentsWriter;
this.bufferedDeletesStream = bufferedDeletesStream;
}
public synchronized long activeBytes() {
@ -240,7 +246,6 @@ final class DocumentsWriterFlushControl {
}
public synchronized void waitForFlush() {
assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush";
while (flushingWriters.size() != 0) {
try {
this.wait();
@ -277,7 +282,7 @@ final class DocumentsWriterFlushControl {
}
assert assertMemory();
// Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
perThreadPool.reset(state, closed);
} finally {
updateStallState();
}
@ -295,7 +300,7 @@ final class DocumentsWriterFlushControl {
assert fullFlush : "can not block if fullFlush == false";
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed;
dwpt = perThreadPool.replaceForFlush(perThread, closed);
dwpt = perThreadPool.reset(perThread, closed);
numPending--;
blockedFlushes.add(new BlockedFlush(dwpt, bytes));
}finally {
@ -311,12 +316,12 @@ final class DocumentsWriterFlushControl {
// We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
if (perThread.isActive()) {
if (perThread.isInitialized()) {
assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed; // do that before
// replace!
dwpt = perThreadPool.replaceForFlush(perThread, closed);
dwpt = perThreadPool.reset(perThread, closed);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes));
@ -413,11 +418,11 @@ final class DocumentsWriterFlushControl {
* Returns the number of delete terms in the global pool
*/
public int getNumGlobalTermDeletes() {
return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedDeletesStream.numTerms();
}
public long getDeleteBytesUsed() {
return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
return documentsWriter.deleteQueue.bytesUsed() + bufferedDeletesStream.bytesUsed();
}
synchronized int numFlushingDWPT() {
@ -441,7 +446,7 @@ final class DocumentsWriterFlushControl {
.currentThread(), documentsWriter);
boolean success = false;
try {
if (perThread.isActive()
if (perThread.isInitialized()
&& perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
@ -475,7 +480,10 @@ final class DocumentsWriterFlushControl {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
if (!next.isActive()) {
if (!next.isInitialized()) {
if (closed && next.isActive()) {
perThreadPool.deactivateThreadState(next);
}
continue;
}
assert next.dwpt.deleteQueue == flushingQueue
@ -515,7 +523,7 @@ final class DocumentsWriterFlushControl {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
assert !next.isActive() || next.dwpt.deleteQueue == queue;
assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ;
} finally {
next.unlock();
}
@ -526,12 +534,12 @@ final class DocumentsWriterFlushControl {
private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
void addFlushableState(ThreadState perThread) {
if (documentsWriter.infoStream.isEnabled("DWFC")) {
documentsWriter.infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
if (infoStream.isEnabled("DWFC")) {
infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
}
final DocumentsWriterPerThread dwpt = perThread.dwpt;
assert perThread.isHeldByCurrentThread();
assert perThread.isActive();
assert perThread.isInitialized();
assert fullFlush;
assert dwpt.deleteQueue != documentsWriter.deleteQueue;
if (dwpt.getNumDocsInRAM() > 0) {
@ -545,11 +553,7 @@ final class DocumentsWriterFlushControl {
fullFlushBuffer.add(flushingDWPT);
}
} else {
if (closed) {
perThreadPool.deactivateThreadState(perThread); // make this state inactive
} else {
perThreadPool.reinitThreadState(perThread);
}
perThreadPool.reset(perThread, closed); // make this state inactive
}
}
@ -594,19 +598,20 @@ final class DocumentsWriterFlushControl {
return true;
}
synchronized void abortFullFlushes() {
synchronized void abortFullFlushes(Set<String> newFiles) {
try {
abortPendingFlushes();
abortPendingFlushes(newFiles);
} finally {
fullFlush = false;
}
}
synchronized void abortPendingFlushes() {
synchronized void abortPendingFlushes(Set<String> newFiles) {
try {
for (DocumentsWriterPerThread dwpt : flushQueue) {
try {
dwpt.abort();
documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM());
dwpt.abort(newFiles);
} catch (Throwable ex) {
// ignore - keep on aborting the flush queue
} finally {
@ -617,7 +622,8 @@ final class DocumentsWriterFlushControl {
try {
flushingWriters
.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
blockedFlush.dwpt.abort();
documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
blockedFlush.dwpt.abort(newFiles);
} catch (Throwable ex) {
// ignore - keep on aborting the blocked queue
} finally {
@ -670,8 +676,8 @@ final class DocumentsWriterFlushControl {
* checked out DWPT are available
*/
void waitIfStalled() {
if (documentsWriter.infoStream.isEnabled("DWFC")) {
documentsWriter.infoStream.message("DWFC",
if (infoStream.isEnabled("DWFC")) {
infoStream.message("DWFC",
"waitIfStalled: numFlushesPending: " + flushQueue.size()
+ " netBytes: " + netBytes() + " flushBytes: " + flushBytes()
+ " fullFlush: " + fullFlush);
@ -686,5 +692,12 @@ final class DocumentsWriterFlushControl {
return stallControl.anyStalledThreads();
}
/**
* Returns the {@link IndexWriter} {@link InfoStream}
*/
public InfoStream getInfoStream() {
return infoStream;
}
}

View File

@ -34,8 +34,7 @@ class DocumentsWriterFlushQueue {
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
void addDeletesAndPurge(DocumentsWriter writer,
DocumentsWriterDeleteQueue deleteQueue) throws IOException {
void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
synchronized (this) {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
@ -49,9 +48,6 @@ class DocumentsWriterFlushQueue {
}
}
}
// don't hold the lock on the FlushQueue when forcing the purge - this blocks and deadlocks
// if we hold the lock.
forcePurge(writer);
}
private void incTickets() {
@ -98,8 +94,9 @@ class DocumentsWriterFlushQueue {
return ticketCount.get() != 0;
}
private void innerPurge(DocumentsWriter writer) throws IOException {
private int innerPurge(IndexWriter writer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
int numPurged = 0;
while (true) {
final FlushTicket head;
final boolean canPublish;
@ -108,6 +105,7 @@ class DocumentsWriterFlushQueue {
canPublish = head != null && head.canPublish(); // do this synced
}
if (canPublish) {
numPurged++;
try {
/*
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
@ -116,6 +114,7 @@ class DocumentsWriterFlushQueue {
* be a ticket still in the queue.
*/
head.publish(writer);
} finally {
synchronized (this) {
// finally remove the published ticket from the queue
@ -128,27 +127,31 @@ class DocumentsWriterFlushQueue {
break;
}
}
return numPurged;
}
void forcePurge(DocumentsWriter writer) throws IOException {
int forcePurge(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(this);
assert !Thread.holdsLock(writer);
purgeLock.lock();
try {
innerPurge(writer);
return innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
void tryPurge(DocumentsWriter writer) throws IOException {
int tryPurge(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(this);
assert !Thread.holdsLock(writer);
if (purgeLock.tryLock()) {
try {
innerPurge(writer);
return innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
return 0;
}
public int getTicketCount() {
@ -169,8 +172,47 @@ class DocumentsWriterFlushQueue {
this.frozenDeletes = frozenDeletes;
}
protected abstract void publish(DocumentsWriter writer) throws IOException;
protected abstract void publish(IndexWriter writer) throws IOException;
protected abstract boolean canPublish();
/**
* Publishes the flushed segment, segment private deletes (if any) and its
* associated global delete (if present) to IndexWriter. The actual
* publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
* delete generation is always GlobalPacket_deleteGeneration + 1
*/
protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
throws IOException {
assert newSegment != null;
assert newSegment.segmentInfo != null;
final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
//System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
if (indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);
}
if (segmentDeletes != null && indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
}
// now publish!
indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
}
protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
assert bufferedDeletes != null;
if (bufferedDeletes != null && bufferedDeletes.any()) {
indexWriter.publishFrozenDeletes(bufferedDeletes);
if (indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
}
}
} else {
publishFlushedSegment(indexWriter, newSegment, bufferedDeletes);
}
}
}
static final class GlobalDeletesTicket extends FlushTicket {
@ -179,11 +221,11 @@ class DocumentsWriterFlushQueue {
super(frozenDeletes);
}
@Override
protected void publish(DocumentsWriter writer) throws IOException {
protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
// its a global ticket - no segment to publish
writer.finishFlush(null, frozenDeletes);
finishFlush(writer, null, frozenDeletes);
}
@Override
@ -201,10 +243,10 @@ class DocumentsWriterFlushQueue {
}
@Override
protected void publish(DocumentsWriter writer) throws IOException {
protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
writer.finishFlush(segment, frozenDeletes);
finishFlush(writer, segment, frozenDeletes);
}
protected void setSegment(FlushedSegment segment) {

View File

@ -22,6 +22,7 @@ import java.text.NumberFormat;
import java.util.Collection;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
@ -144,7 +145,7 @@ class DocumentsWriterPerThread {
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
void abort() {
void abort(Set<String> createdFiles) {
//System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name);
hasAborted = aborting = true;
try {
@ -157,10 +158,7 @@ class DocumentsWriterPerThread {
}
pendingDeletes.clear();
deleteSlice = deleteQueue.newSlice();
// Reset all postings data
doAfterFlush();
createdFiles.addAll(directory.getCreatedFiles());
} finally {
aborting = false;
if (infoStream.isEnabled("DWPT")) {
@ -169,83 +167,77 @@ class DocumentsWriterPerThread {
}
}
private final static boolean INFO_VERBOSE = false;
final DocumentsWriter parent;
final Codec codec;
final IndexWriter writer;
final TrackingDirectoryWrapper directory;
final Directory directoryOrig;
final DocState docState;
final DocConsumer consumer;
final Counter bytesUsed;
SegmentWriteState flushState;
//Deletes for our still-in-RAM (to be flushed next) segment
BufferedDeletes pendingDeletes;
SegmentInfo segmentInfo; // Current segment we are working on
final BufferedDeletes pendingDeletes;
private final SegmentInfo segmentInfo; // Current segment we are working on
boolean aborting = false; // True if an abort is pending
boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting
private FieldInfos.Builder fieldInfos;
private final InfoStream infoStream;
private int numDocsInRAM;
private int flushedDocCount;
DocumentsWriterDeleteQueue deleteQueue;
DeleteSlice deleteSlice;
final DocumentsWriterDeleteQueue deleteQueue;
private final DeleteSlice deleteSlice;
private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
final Allocator byteBlockAllocator;
final IntBlockPool.Allocator intBlockAllocator;
private final LiveIndexWriterConfig indexWriterConfig;
public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
FieldInfos.Builder fieldInfos, IndexingChain indexingChain) {
public DocumentsWriterPerThread(String segmentName, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos) {
this.directoryOrig = directory;
this.directory = new TrackingDirectoryWrapper(directory);
this.parent = parent;
this.fieldInfos = fieldInfos;
this.writer = parent.indexWriter;
this.indexWriterConfig = parent.indexWriterConfig;
this.infoStream = parent.infoStream;
this.codec = parent.codec;
this.indexWriterConfig = indexWriterConfig;
this.infoStream = infoStream;
this.codec = indexWriterConfig.getCodec();
this.docState = new DocState(this, infoStream);
this.docState.similarity = parent.indexWriter.getConfig().getSimilarity();
this.docState.similarity = indexWriterConfig.getSimilarity();
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
pendingDeletes = new BufferedDeletes();
intBlockAllocator = new IntBlockAllocator(bytesUsed);
initialize();
// this should be the last call in the ctor
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
consumer = indexingChain.getChain(this);
}
public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos.Builder fieldInfos) {
this(other.directoryOrig, other.parent, fieldInfos, other.parent.chain);
}
void initialize() {
deleteQueue = parent.deleteQueue;
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
pendingDeletes.clear();
deleteSlice = null;
}
deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segmentName, -1,
false, codec, null, null);
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
}
// this should be the last call in the ctor
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
consumer = indexWriterConfig.getIndexingChain().getChain(this);
}
void setAborting() {
aborting = true;
}
final boolean testPoint(String message) {
if (infoStream.isEnabled("TP")) {
infoStream.message("TP", message);
}
return true;
}
boolean checkAndResetHasAborted() {
final boolean retval = hasAborted;
hasAborted = false;
return retval;
}
final boolean testPoint(String message) {
if (infoStream.isEnabled("TP")) {
infoStream.message("TP", message);
}
return true;
}
public void updateDocument(IndexDocument doc, Analyzer analyzer, Term delTerm) throws IOException {
assert testPoint("DocumentsWriterPerThread addDocument start");
@ -253,9 +245,6 @@ class DocumentsWriterPerThread {
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (segmentInfo == null) {
initSegmentInfo();
}
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
@ -274,7 +263,7 @@ class DocumentsWriterPerThread {
deleteDocID(docState.docID);
numDocsInRAM++;
} else {
abort();
abort(filesToDelete);
}
}
}
@ -284,29 +273,16 @@ class DocumentsWriterPerThread {
success = true;
} finally {
if (!success) {
abort();
abort(filesToDelete);
}
}
finishDocument(delTerm);
}
private void initSegmentInfo() {
String segment = writer.newSegmentName();
segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segment, -1,
false, codec, null, null);
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
}
}
public int updateDocuments(Iterable<? extends IndexDocument> docs, Analyzer analyzer, Term delTerm) throws IOException {
assert testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
if (segmentInfo == null) {
initSegmentInfo();
}
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
@ -331,7 +307,7 @@ class DocumentsWriterPerThread {
// be called (because an exc is being thrown):
numDocsInRAM++;
} else {
abort();
abort(filesToDelete);
}
}
}
@ -341,7 +317,7 @@ class DocumentsWriterPerThread {
success = true;
} finally {
if (!success) {
abort();
abort(filesToDelete);
}
}
@ -384,21 +360,18 @@ class DocumentsWriterPerThread {
* the updated slice we get from 1. holds all the deletes that have occurred
* since we updated the slice the last time.
*/
if (deleteSlice == null) {
deleteSlice = deleteQueue.newSlice();
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
deleteSlice.reset();
}
} else {
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingDeletes, numDocsInRAM);
} else if (deleteQueue.updateSlice(deleteSlice)) {
deleteSlice.apply(pendingDeletes, numDocsInRAM);
}
boolean applySlice = numDocsInRAM != 0;
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
}
if (applySlice) {
deleteSlice.apply(pendingDeletes, numDocsInRAM);
} else { // if we don't need to apply we must reset!
deleteSlice.reset();
}
++numDocsInRAM;
}
@ -434,14 +407,6 @@ class DocumentsWriterPerThread {
return numDocsInRAM;
}
/** Reset after a flush */
private void doAfterFlush() {
segmentInfo = null;
directory.getCreatedFiles().clear();
fieldInfos = new FieldInfos.Builder(fieldInfos.globalFieldNumbers);
parent.subtractFlushedNumDocs(numDocsInRAM);
numDocsInRAM = 0;
}
/**
* Prepares this DWPT for flushing. This method will freeze and return the
@ -457,7 +422,7 @@ class DocumentsWriterPerThread {
// apply all deletes before we flush and release the delete slice
deleteSlice.apply(pendingDeletes, numDocsInRAM);
assert deleteSlice.isEmpty();
deleteSlice = null;
deleteSlice.reset();
}
return globalDeletes;
}
@ -465,11 +430,11 @@ class DocumentsWriterPerThread {
/** Flush all pending docs to a new segment */
FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
assert deleteSlice == null : "all deletes must be applied in prepareFlush";
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setDocCount(numDocsInRAM);
flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
pendingDeletes, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
final double startMBUsed = parent.flushControl.netBytes() / 1024. / 1024.;
final double startMBUsed = bytesUsed() / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
@ -515,15 +480,11 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "flushed codec=" + codec);
}
flushedDocCount += flushState.segmentInfo.getDocCount();
final BufferedDeletes segmentDeletes;
if (pendingDeletes.queries.isEmpty()) {
pendingDeletes.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingDeletes;
pendingDeletes = new BufferedDeletes();
}
if (infoStream.isEnabled("DWPT")) {
@ -531,7 +492,7 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
" ramUsed=" + nf.format(startMBUsed) + " MB" +
" newFlushedSize(includes docstores)=" + nf.format(newSegmentSize) + " MB" +
" docs/MB=" + nf.format(flushedDocCount / newSegmentSize));
" docs/MB=" + nf.format(flushState.segmentInfo.getDocCount() / newSegmentSize));
}
assert segmentInfo != null;
@ -539,20 +500,21 @@ class DocumentsWriterPerThread {
FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
sealFlushedSegment(fs);
doAfterFlush();
success = true;
return fs;
} finally {
if (!success) {
if (segmentInfo != null) {
writer.flushFailed(segmentInfo);
}
abort();
abort(filesToDelete);
}
}
}
private final Set<String> filesToDelete = new HashSet<String>();
public Set<String> pendingFilesToDelete() {
return filesToDelete;
}
/**
* Seals the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link MutableBits}.
@ -568,12 +530,10 @@ class DocumentsWriterPerThread {
boolean success = false;
try {
if (indexWriterConfig.getUseCompoundFile()) {
// Now build compound file
Collection<String> oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context);
filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context));
newSegment.info.setUseCompoundFile(true);
writer.deleteNewFiles(oldFiles);
}
// Have codec write SegmentInfo. Must do this after
@ -618,7 +578,6 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "hit exception " +
"reating compound file for newly flushed segment " + newSegment.info.name);
}
writer.flushFailed(newSegment.info);
}
}
}
@ -671,4 +630,5 @@ class DocumentsWriterPerThread {
+ ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborting=" + aborting + ", numDocsInRAM="
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
}
}

View File

@ -71,12 +71,16 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
* for indexing anymore.
* @see #isActive()
*/
private void resetWriter(DocumentsWriterPerThread dwpt) {
private void deactivate() {
assert this.isHeldByCurrentThread();
if (dwpt == null) {
isActive = false;
}
this.dwpt = dwpt;
isActive = false;
reset();
}
private void reset() {
assert this.isHeldByCurrentThread();
this.dwpt = null;
this.bytesUsed = 0;
this.flushPending = false;
}
@ -91,6 +95,11 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
return isActive;
}
boolean isInitialized() {
assert this.isHeldByCurrentThread();
return isActive() && dwpt != null;
}
/**
* Returns the number of currently active bytes in this ThreadState's
* {@link DocumentsWriterPerThread}
@ -121,9 +130,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
private ThreadState[] threadStates;
private volatile int numThreadStatesActive;
private SetOnce<FieldNumbers> globalFieldMap = new SetOnce<FieldNumbers>();
private SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
/**
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/
@ -133,14 +140,8 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
}
threadStates = new ThreadState[maxNumThreadStates];
numThreadStatesActive = 0;
}
void initialize(DocumentsWriter documentsWriter, FieldNumbers globalFieldMap, LiveIndexWriterConfig config) {
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
this.globalFieldMap.set(globalFieldMap);
for (int i = 0; i < threadStates.length; i++) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap);
threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
threadStates[i] = new ThreadState(null);
}
}
@ -158,9 +159,10 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
// should not happen
throw new RuntimeException(e);
}
clone.documentsWriter = new SetOnce<DocumentsWriter>();
clone.globalFieldMap = new SetOnce<FieldNumbers>();
clone.threadStates = new ThreadState[threadStates.length];
for (int i = 0; i < threadStates.length; i++) {
clone.threadStates[i] = new ThreadState(null);
}
return clone;
}
@ -178,6 +180,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
int getActiveThreadState() {
return numThreadStatesActive;
}
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
@ -198,8 +201,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
if (threadState.isActive()) {
// unreleased thread states are deactivated during DW#close()
numThreadStatesActive++; // increment will publish the ThreadState
assert threadState.dwpt != null;
threadState.dwpt.initialize();
assert threadState.dwpt == null;
unlock = false;
return threadState;
}
@ -220,7 +222,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
try {
assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
assert !threadStates[i].isInitialized() : "expected unreleased thread state to be inactive";
} finally {
threadStates[i].unlock();
}
@ -236,24 +238,20 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
final ThreadState threadState = threadStates[i];
threadState.lock();
try {
threadState.resetWriter(null);
threadState.deactivate();
} finally {
threadState.unlock();
}
}
}
DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
assert threadState.isHeldByCurrentThread();
assert globalFieldMap.get() != null;
final DocumentsWriterPerThread dwpt = threadState.dwpt;
if (!closed) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap.get());
final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
newDwpt.initialize();
threadState.resetWriter(newDwpt);
threadState.reset();
} else {
threadState.resetWriter(null);
threadState.deactivate();
}
return dwpt;
}
@ -328,18 +326,6 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
*/
void deactivateThreadState(ThreadState threadState) {
assert threadState.isActive();
threadState.resetWriter(null);
}
/**
* Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
* only be reinitialized if it is active without any pending documents.
*
* @param threadState the state to reinitialize
*/
void reinitThreadState(ThreadState threadState) {
assert threadState.isActive;
assert threadState.dwpt.getNumDocsInRAM() == 0;
threadState.dwpt.initialize();
threadState.deactivate();
}
}

View File

@ -68,12 +68,11 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
control.setApplyAllDeletes();
}
}
final DocumentsWriter writer = this.writer.get();
if ((flushOnRAM() &&
control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) {
control.setApplyAllDeletes();
if (writer.infoStream.isEnabled("FP")) {
writer.infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
}
}
}
@ -89,9 +88,8 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
if (totalRam >= limit) {
final DocumentsWriter writer = this.writer.get();
if (writer.infoStream.isEnabled("FP")) {
writer.infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
}
markLargestWriterPending(control, state, totalRam);
}

View File

@ -20,6 +20,7 @@ import java.util.Iterator;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SetOnce;
/**
@ -52,8 +53,8 @@ import org.apache.lucene.util.SetOnce;
* @see IndexWriterConfig#setFlushPolicy(FlushPolicy)
*/
abstract class FlushPolicy implements Cloneable {
protected SetOnce<DocumentsWriter> writer = new SetOnce<DocumentsWriter>();
protected LiveIndexWriterConfig indexWriterConfig;
protected InfoStream infoStream;
/**
* Called for each delete term. If this is a delete triggered due to an update
@ -93,9 +94,9 @@ abstract class FlushPolicy implements Cloneable {
/**
* Called by DocumentsWriter to initialize the FlushPolicy
*/
protected synchronized void init(DocumentsWriter docsWriter) {
writer.set(docsWriter);
indexWriterConfig = docsWriter.indexWriter.getConfig();
protected synchronized void init(LiveIndexWriterConfig indexWriterConfig) {
this.indexWriterConfig = indexWriterConfig;
infoStream = indexWriterConfig.getInfoStream();
}
/**
@ -127,8 +128,8 @@ abstract class FlushPolicy implements Cloneable {
}
private boolean assertMessage(String s) {
if (writer.get().infoStream.isEnabled("FP")) {
writer.get().infoStream.message("FP", s);
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", s);
}
return true;
}
@ -142,8 +143,8 @@ abstract class FlushPolicy implements Cloneable {
// should not happen
throw new RuntimeException(e);
}
clone.writer = new SetOnce<DocumentsWriter>();
clone.indexWriterConfig = null;
clone.infoStream = null;
return clone;
}
}

View File

@ -30,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -182,7 +183,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
* referenced by the "front" of the index). For this, IndexFileDeleter
* keeps track of the last non commit checkpoint.
*/
public class IndexWriter implements Closeable, TwoPhaseCommit {
public class IndexWriter implements Closeable, TwoPhaseCommit{
private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
@ -227,6 +228,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
final FieldNumbers globalFieldNumberMap;
private DocumentsWriter docWriter;
private final Queue<Event> eventQueue;
final IndexFileDeleter deleter;
// used by forceMerge to note those needing merging
@ -360,7 +362,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized (fullFlushLock) {
boolean success = false;
try {
anySegmentFlushed = docWriter.flushAllThreads();
anySegmentFlushed = docWriter.flushAllThreads(this);
if (!anySegmentFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
@ -730,7 +732,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// start with previous field numbers, but new FieldInfos
globalFieldNumberMap = getFieldNumberMap();
docWriter = new DocumentsWriter(codec, config, directory, this, globalFieldNumberMap, bufferedDeletesStream);
config.getFlushPolicy().init(config);
docWriter = new DocumentsWriter(this, config, directory);
eventQueue = docWriter.eventQueue();
// Default deleter (for backwards compatibility) is
// KeepOnlyLastCommitDeleter:
@ -961,7 +965,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
if (doFlush) {
flush(waitForMerges, true);
} else {
docWriter.abort(); // already closed -- never sync on IW
docWriter.abort(this); // already closed -- never sync on IW
}
} finally {
@ -1033,7 +1037,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized(this) {
closed = true;
}
assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates();
assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates() : "" + oldWriter.perThreadPool.numDeactivatedThreadStates() + " " + oldWriter.perThreadPool.getMaxThreadStates();
} catch (OutOfMemoryError oom) {
handleOOM(oom, "closeInternal");
} finally {
@ -1280,9 +1284,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
ensureOpen();
try {
boolean success = false;
boolean anySegmentFlushed = false;
try {
anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm);
if (docWriter.updateDocuments(docs, analyzer, delTerm)) {
processEvents(true, false);
}
success = true;
} finally {
if (!success) {
@ -1291,9 +1296,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
}
if (anySegmentFlushed) {
maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocuments");
}
@ -1313,7 +1315,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Term term) throws IOException {
ensureOpen();
try {
docWriter.deleteTerms(term);
if (docWriter.deleteTerms(term)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term)");
}
@ -1412,7 +1416,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Term... terms) throws IOException {
ensureOpen();
try {
docWriter.deleteTerms(terms);
if (docWriter.deleteTerms(terms)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term..)");
}
@ -1432,7 +1438,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Query query) throws IOException {
ensureOpen();
try {
docWriter.deleteQueries(query);
if (docWriter.deleteQueries(query)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query)");
}
@ -1454,7 +1462,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Query... queries) throws IOException {
ensureOpen();
try {
docWriter.deleteQueries(queries);
if (docWriter.deleteQueries(queries)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query..)");
}
@ -1505,9 +1515,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
ensureOpen();
try {
boolean success = false;
boolean anySegmentFlushed = false;
try {
anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term);
if (docWriter.updateDocument(doc, analyzer, term)) {
processEvents(true, false);
}
success = true;
} finally {
if (!success) {
@ -1516,10 +1527,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
}
if (anySegmentFlushed) {
maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocument");
}
@ -1730,7 +1737,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// complete
ensureOpen();
}
// NOTE: in the ConcurrentMergeScheduler case, when
// doWait is false, we can return immediately while
// background threads accomplish the merging
@ -2009,8 +2015,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
mergeScheduler.close();
bufferedDeletesStream.clear();
processEvents(false, true);
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
docWriter.abort(); // don't sync on IW here
docWriter.abort(this); // don't sync on IW here
synchronized(this) {
if (pendingCommit != null) {
@ -2102,7 +2109,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
* sure it's just like a fresh index.
*/
try {
docWriter.lockAndAbortAll();
docWriter.lockAndAbortAll(this);
processEvents(false, true);
synchronized (this) {
try {
// Abort any running merges
@ -2135,7 +2143,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
} finally {
docWriter.unlockAllAfterAbortAll();
docWriter.unlockAllAfterAbortAll(this);
}
}
}
@ -2243,33 +2251,40 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer.
*/
synchronized void publishFlushedSegment(SegmentInfoPerCommit newSegment,
void publishFlushedSegment(SegmentInfoPerCommit newSegment,
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publishFlushedSegment");
try {
synchronized (this) {
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publishFlushedSegment");
}
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
// Publishing the segment must be synched on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
nextGen = bufferedDeletesStream.push(packet);
} else {
// Since we don't have a delete packet to apply we can get a new
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
}
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
}
}
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
// Publishing the segment must be synched on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
nextGen = bufferedDeletesStream.push(packet);
} else {
// Since we don't have a delete packet to apply we can get a new
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
}
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
} finally {
flushCount.incrementAndGet();
doAfterFlush();
}
}
@ -2705,12 +2720,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
boolean flushSuccess = false;
boolean success = false;
try {
anySegmentsFlushed = docWriter.flushAllThreads();
anySegmentsFlushed = docWriter.flushAllThreads(this);
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
processEvents(false, true);
flushSuccess = true;
synchronized(this) {
@ -2750,7 +2766,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
} catch (OutOfMemoryError oom) {
handleOOM(oom, "prepareCommit");
}
boolean success = false;
try {
if (anySegmentsFlushed) {
@ -2765,7 +2781,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
}
startCommit(toCommit);
}
}
@ -2950,10 +2966,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
anySegmentFlushed = docWriter.flushAllThreads();
anySegmentFlushed = docWriter.flushAllThreads(this);
flushSuccess = true;
} finally {
docWriter.finishFullFlush(flushSuccess);
processEvents(false, true);
}
}
synchronized(this) {
@ -4307,4 +4324,65 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized final void flushFailed(SegmentInfo info) throws IOException {
deleter.refresh(info.name);
}
final int purge(boolean forced) throws IOException {
return docWriter.purgeBuffer(this, forced);
}
final void applyDeletesAndPurge(boolean forcePurge) throws IOException {
try {
purge(forcePurge);
} finally {
applyAllDeletes();
flushCount.incrementAndGet();
}
}
final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException {
try {
purge(forcePurge);
} finally {
if (triggerMerge) {
maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
}
private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
return processEvents(eventQueue, triggerMerge, forcePurge);
}
private boolean processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
Event event;
boolean processed = false;
while((event = queue.poll()) != null) {
processed = true;
event.process(this, triggerMerge, forcePurge);
}
return processed;
}
/**
* Interface for internal atomic events. See {@link DocumentsWriter} for details. Events are executed concurrently and no order is guaranteed.
* Each event should only rely on the serializeability within it's process method. All actions that must happen before or after a certain action must be
* encoded inside the {@link #process(IndexWriter, boolean, boolean)} method.
*
*/
static interface Event {
/**
* Processes the event. This method is called by the {@link IndexWriter}
* passed as the first argument.
*
* @param writer
* the {@link IndexWriter} that executes the event.
* @param triggerMerge
* <code>false</code> iff this event should not trigger any segment merges
* @param clearBuffers
* <code>true</code> iff this event should clear all buffers associated with the event.
* @throws IOException
* if an {@link IOException} occurs
*/
void process(IndexWriter writer, boolean triggerMerge, boolean clearBuffers) throws IOException;
}
}

View File

@ -281,7 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
long bytesUsed = 0;
while (allActiveThreads.hasNext()) {
bytesUsed += allActiveThreads.next().dwpt.bytesUsed();
ThreadState next = allActiveThreads.next();
if (next.dwpt != null) {
bytesUsed += next.dwpt.bytesUsed();
}
}
assertEquals(bytesUsed, flushControl.activeBytes());
}

View File

@ -1702,7 +1702,6 @@ public class TestIndexWriter extends LuceneTestCase {
w.deleteAll();
w.commit();
// Make sure we accumulate no files except for empty
// segments_N and segments.gen:
assertTrue(d.listAll().length <= 2);

View File

@ -83,7 +83,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(random()))
.setMaxBufferedDocs(2).setMergePolicy(ldmp).setMergeScheduler(new ConcurrentMergeScheduler()));
for(int iter=0;iter<10;iter++) {
for(int i=0;i<19;i++)
writer.addDocument(doc);
@ -96,7 +96,6 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
sis.read(dir);
final int segCount = sis.size();
writer.forceMerge(7);
writer.commit();
writer.waitForMerges();
@ -108,7 +107,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
if (segCount < 7)
assertEquals(segCount, optSegCount);
else
assertEquals(7, optSegCount);
assertEquals("seg: " + segCount, 7, optSegCount);
}
writer.close();
dir.close();

View File

@ -91,7 +91,9 @@ New Features
* SOLR-4808: Persist and use router,replicationFactor and maxShardsPerNode at Collection and Shard level (Noble Paul, Shalin Mangar)
* SOLR-5006: CREATESHARD command for 'implicit' shards (Noble Paul)
* SOLR-5017: Allow sharding based on the value of a field (Noble Paul)
* SOLR-4222:create custom sharded collection via collections API (Noble Paul)
* SOLR-4222: create custom sharded collection via collections API (Noble Paul)
* SOLR-4718: Allow solr.xml to be stored in ZooKeeper
* SOLR-5156: Enhance ZkCLI to allow uploading of arbitrary files to ZK.
Bug Fixes
----------------------
@ -130,7 +132,14 @@ Bug Fixes
of divide by zero, and makes estimated hit counts meaningful in non-optimized
indexes. (hossman)
* SOLR-5164: Can not create a collection via collections API (cloud mode) (Erick Erickson)
* SOLR-3936: Fixed QueryElevationComponent sorting when used with Grouping
(Michael Garski via hossman)
* SOLR-5171: SOLR Admin gui works in IE9, breaks in IE10. (Joseph L Howard via
steffkes)
* SOLR-5174: Admin UI - Query View doesn't highlight (json) Result if it
contains HTML Tags (steffkes)
Optimizations
----------------------
@ -148,8 +157,8 @@ Other Changes
The solr.clustering.enabled system property is set to 'true' by default.
(ehatcher, Dawid Weiss)
* SOLR-4914: Factor out core list persistence and discovery into a
new CoresLocator interface. (Alan Woodward)
* SOLR-4914, SOLR-5162: Factor out core list persistence and discovery into a
new CoresLocator interface. (Alan Woodward, Shawn Heisey)
* SOLR-5056: Improve type safety of ConfigSolr class. (Alan Woodward)

View File

@ -26,7 +26,6 @@ import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.net.URL;
import java.net.MalformedURLException;
@ -499,6 +498,10 @@ public class JettySolrRunner {
public void setCoreNodeName(String coreNodeName) {
this.coreNodeName = coreNodeName;
}
public String getSolrHome() {
return solrHome;
}
}
class NoLog implements Logger {

View File

@ -3,6 +3,7 @@ package org.apache.solr.cloud;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@ -19,7 +20,9 @@ import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.TimeoutException;
@ -44,6 +47,7 @@ public class ZkCLI {
private static final String MAKEPATH = "makepath";
private static final String PUT = "put";
private static final String PUT_FILE = "putfile";
private static final String DOWNCONFIG = "downconfig";
private static final String ZK_CLI_NAME = "ZkCLI";
private static final String HELP = "help";
@ -87,7 +91,8 @@ public class ZkCLI {
.hasArg(true)
.withDescription(
"cmd to run: " + BOOTSTRAP + ", " + UPCONFIG + ", " + DOWNCONFIG
+ ", " + LINKCONFIG + ", " + MAKEPATH + ", "+ PUT + ", "+ LIST + ", " + CLEAR).create(CMD));
+ ", " + LINKCONFIG + ", " + MAKEPATH + ", " + PUT + ", " + PUT_FILE + ","
+ LIST + ", " + CLEAR).create(CMD));
Option zkHostOption = new Option("z", ZKHOST, true,
"ZooKeeper host address");
@ -131,6 +136,7 @@ public class ZkCLI {
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + LINKCONFIG + " -" + COLLECTION + " collection1" + " -" + CONFNAME + " myconf");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + MAKEPATH + " /apache/solr");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + PUT + " /solr.conf 'conf data'");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + PUT_FILE + " /solr.xml /User/myuser/solr/solr.xml");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + CLEAR + " /solr");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + LIST);
return;
@ -244,6 +250,20 @@ public class ZkCLI {
}
zkClient.create(arglist.get(0).toString(), arglist.get(1).toString().getBytes("UTF-8"),
acl, CreateMode.PERSISTENT, true);
} else if (line.getOptionValue(CMD).equals(PUT_FILE)) {
List arglist = line.getArgList();
if (arglist.size() != 2) {
System.out.println("-" + PUT_FILE + " requires two args - the path to create in ZK and the path to the local file");
System.exit(1);
}
InputStream is = new FileInputStream(arglist.get(1).toString());
try {
zkClient.create(arglist.get(0).toString(), IOUtils.toByteArray(is),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
} finally {
IOUtils.closeQuietly(is);
}
}
} finally {
if (solrPort != null) {

View File

@ -61,10 +61,7 @@ public abstract class ConfigSolr {
else {
inputStream = new FileInputStream(configFile);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteStreams.copy(inputStream, baos);
String originalXml = IOUtils.toString(new ByteArrayInputStream(baos.toByteArray()), "UTF-8");
return fromInputStream(loader, new ByteArrayInputStream(baos.toByteArray()), configFile, originalXml);
return fromInputStream(loader, inputStream);
}
catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@ -76,13 +73,17 @@ public abstract class ConfigSolr {
}
public static ConfigSolr fromString(String xml) {
return fromInputStream(null, new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8)), null, xml);
return fromInputStream(null, new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8)));
}
public static ConfigSolr fromInputStream(SolrResourceLoader loader, InputStream is, File file, String originalXml) {
public static ConfigSolr fromInputStream(SolrResourceLoader loader, InputStream is) {
try {
Config config = new Config(loader, null, new InputSource(is), null, false);
return fromConfig(config, file, originalXml);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteStreams.copy(is, baos);
String originalXml = IOUtils.toString(new ByteArrayInputStream(baos.toByteArray()), "UTF-8");
ByteArrayInputStream dup = new ByteArrayInputStream(baos.toByteArray());
Config config = new Config(loader, null, new InputSource(dup), null, false);
return fromConfig(config, originalXml);
}
catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@ -93,9 +94,9 @@ public abstract class ConfigSolr {
return fromFile(loader, new File(solrHome, SOLR_XML_FILE));
}
public static ConfigSolr fromConfig(Config config, File file, String originalXml) {
public static ConfigSolr fromConfig(Config config, String originalXml) {
boolean oldStyle = (config.getNode("solr/cores", false) != null);
return oldStyle ? new ConfigSolrXmlOld(config, file, originalXml)
return oldStyle ? new ConfigSolrXmlOld(config, originalXml)
: new ConfigSolrXml(config);
}

View File

@ -55,15 +55,15 @@ public class ConfigSolrXmlOld extends ConfigSolr {
return "solr/cores/shardHandlerFactory";
}
public ConfigSolrXmlOld(Config config, File configFile, String originalXML) {
public ConfigSolrXmlOld(Config config, String originalXML) {
super(config);
try {
checkForIllegalConfig();
fillPropMap();
config.substituteProperties();
initCoreList();
this.persistor = isPersistent() ? new SolrXMLCoresLocator(configFile, originalXML, this)
: new SolrXMLCoresLocator.NonPersistingLocator(configFile, originalXML, this);
this.persistor = isPersistent() ? new SolrXMLCoresLocator(originalXML, this)
: new SolrXMLCoresLocator.NonPersistingLocator(originalXML, this);
}
catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);

View File

@ -480,7 +480,7 @@ public class CoreContainer {
SolrResourceLoader solrLoader = null;
SolrConfig config = null;
solrLoader = new SolrResourceLoader(instanceDir, loader.getClassLoader(), dcore.getCoreProperties());
solrLoader = new SolrResourceLoader(instanceDir, loader.getClassLoader(), dcore.getSubstitutableProperties());
try {
config = new SolrConfig(solrLoader, dcore.getConfigName(), null);
} catch (Exception e) {
@ -646,7 +646,7 @@ public class CoreContainer {
SolrResourceLoader solrLoader;
if(zkSys.getZkController() == null) {
solrLoader = new SolrResourceLoader(instanceDir.getAbsolutePath(), loader.getClassLoader(),
cd.getCoreProperties());
cd.getSubstitutableProperties());
} else {
try {
String collection = cd.getCloudDescriptor().getCollectionName();
@ -659,7 +659,7 @@ public class CoreContainer {
"Could not find config name for collection:" + collection);
}
solrLoader = new ZkSolrResourceLoader(instanceDir.getAbsolutePath(), zkConfigName, loader.getClassLoader(),
cd.getCoreProperties(), zkSys.getZkController());
cd.getSubstitutableProperties(), zkSys.getZkController());
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,

View File

@ -119,6 +119,9 @@ public class CoreDescriptor {
/** The properties for this core, as available through getProperty() */
protected final Properties coreProperties = new Properties();
/** The properties for this core, substitutable by resource loaders */
protected final Properties substitutableProperties = new Properties();
/**
* Create a new CoreDescriptor.
* @param container the CoreDescriptor's container
@ -160,6 +163,7 @@ public class CoreDescriptor {
}
loadExtraProperties();
buildSubstitutableProperties();
// TODO maybe make this a CloudCoreDescriptor subclass?
if (container.isZooKeeperAware()) {
@ -201,6 +205,20 @@ public class CoreDescriptor {
}
}
/**
* Create the properties object used by resource loaders, etc, for property
* substitution. The default solr properties are prefixed with 'solr.core.', so,
* e.g., 'name' becomes 'solr.core.name'
*/
protected void buildSubstitutableProperties() {
for (String propName : coreProperties.stringPropertyNames()) {
String propValue = coreProperties.getProperty(propName);
if (!isUserDefinedProperty(propName))
propName = "solr.core." + propName;
substitutableProperties.setProperty(propName, propValue);
}
}
protected File resolvePaths(String filepath) {
File file = new File(filepath);
if (file.isAbsolute())
@ -336,11 +354,11 @@ public class CoreDescriptor {
}
/**
* Returns all properties defined on this CoreDescriptor
* @return all properties defined on this CoreDescriptor
* Returns all substitutable properties defined on this CoreDescriptor
* @return all substitutable properties defined on this CoreDescriptor
*/
public Properties getCoreProperties() {
return coreProperties;
public Properties getSubstitutableProperties() {
return substitutableProperties;
}
@Override

View File

@ -2241,6 +2241,7 @@ public final class SolrCore implements SolrInfoMBean {
lst.add("coreName", name==null ? "(null)" : name);
lst.add("startTime", new Date(startTime));
lst.add("refCount", getOpenCount());
lst.add("instanceDir", resourceLoader.getInstanceDir());
lst.add("indexDir", getIndexDir());
CoreDescriptor cd = getCoreDescriptor();

View File

@ -41,7 +41,6 @@ public class SolrXMLCoresLocator implements CoresLocator {
private static final Logger logger = LoggerFactory.getLogger(SolrXMLCoresLocator.class);
private final File file;
private final String solrXmlTemplate;
private final ConfigSolrXmlOld cfg;
@ -50,13 +49,11 @@ public class SolrXMLCoresLocator implements CoresLocator {
/**
* Create a new SolrXMLCoresLocator
* @param file a File object representing the file to write out to
* @param originalXML the original content of the solr.xml file
* @param cfg the CoreContainer's config object
*/
public SolrXMLCoresLocator(File file, String originalXML, ConfigSolrXmlOld cfg) {
public SolrXMLCoresLocator(String originalXML, ConfigSolrXmlOld cfg) {
this.solrXmlTemplate = buildTemplate(originalXML);
this.file = file;
this.cfg = cfg;
}
@ -147,6 +144,7 @@ public class SolrXMLCoresLocator implements CoresLocator {
}
protected void doPersist(String xml) {
File file = new File(cfg.config.getResourceLoader().getInstanceDir(), ConfigSolr.SOLR_XML_FILE);
try {
Writer writer = new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8);
writer.write(xml);
@ -204,8 +202,8 @@ public class SolrXMLCoresLocator implements CoresLocator {
public static class NonPersistingLocator extends SolrXMLCoresLocator {
public NonPersistingLocator(File file, String originalXML, ConfigSolrXmlOld cfg) {
super(file, originalXML, cfg);
public NonPersistingLocator(String originalXML, ConfigSolrXmlOld cfg) {
super(originalXML, cfg);
this.xml = originalXML;
}

View File

@ -227,7 +227,7 @@ public class ZkContainer {
"Could not find config name for collection:" + collection);
}
solrLoader = new ZkSolrResourceLoader(instanceDir, zkConfigName,
loader.getClassLoader(), dcore.getCoreProperties(), zkController);
loader.getClassLoader(), dcore.getSubstitutableProperties(), zkController);
config = getSolrConfigFromZk(zkConfigName, dcore.getConfigName(),
solrLoader);
schema = IndexSchemaFactory.buildIndexSchema(dcore.getSchemaName(),

View File

@ -410,7 +410,8 @@ public class CoreAdminHandler extends RequestHandlerBase {
"Missing parameter [" + CoreAdminParams.NAME + "]");
String instancedir = params.get(CoreAdminParams.INSTANCE_DIR);
if (StringUtils.isEmpty(instancedir)) {
instancedir = name; // Already relative to solrHome, we haven't been given an absolute path.
instancedir = name; // will be resolved later against solr.home
//instancedir = container.getSolrHome() + "/" + name;
}
Properties coreProps = new Properties();

View File

@ -44,6 +44,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.QueryElevationParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.search.grouping.GroupingSpecification;
import org.apache.solr.util.DOMUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@ -424,23 +425,25 @@ public class QueryElevationComponent extends SearchComponent implements SolrCore
}));
} else {
// Check if the sort is based on score
boolean modify = false;
SortField[] current = sortSpec.getSort().getSort();
ArrayList<SortField> sorts = new ArrayList<SortField>(current.length + 1);
// Perhaps force it to always sort by score
if (force && current[0].getType() != SortField.Type.SCORE) {
sorts.add(new SortField("_elevate_", comparator, true));
modify = true;
Sort modified = this.modifySort(current, force, comparator);
if(modified != null) {
sortSpec.setSort(modified);
}
for (SortField sf : current) {
if (sf.getType() == SortField.Type.SCORE) {
sorts.add(new SortField("_elevate_", comparator, !sf.getReverse()));
modify = true;
}
sorts.add(sf);
}
// alter the sorting in the grouping specification if there is one
GroupingSpecification groupingSpec = rb.getGroupingSpec();
if(groupingSpec != null) {
SortField[] groupSort = groupingSpec.getGroupSort().getSort();
Sort modGroupSort = this.modifySort(groupSort, force, comparator);
if(modGroupSort != null) {
groupingSpec.setGroupSort(modGroupSort);
}
if (modify) {
sortSpec.setSort(new Sort(sorts.toArray(new SortField[sorts.size()])));
SortField[] withinGroupSort = groupingSpec.getSortWithinGroup().getSort();
Sort modWithinGroupSort = this.modifySort(withinGroupSort, force, comparator);
if(modWithinGroupSort != null) {
groupingSpec.setSortWithinGroup(modWithinGroupSort);
}
}
}
@ -466,6 +469,25 @@ public class QueryElevationComponent extends SearchComponent implements SolrCore
}
}
private Sort modifySort(SortField[] current, boolean force, ElevationComparatorSource comparator) {
boolean modify = false;
ArrayList<SortField> sorts = new ArrayList<SortField>(current.length + 1);
// Perhaps force it to always sort by score
if (force && current[0].getType() != SortField.Type.SCORE) {
sorts.add(new SortField("_elevate_", comparator, true));
modify = true;
}
for (SortField sf : current) {
if (sf.getType() == SortField.Type.SCORE) {
sorts.add(new SortField("_elevate_", comparator, !sf.getReverse()));
modify = true;
}
sorts.add(sf);
}
return modify ? new Sort(sorts.toArray(new SortField[sorts.size()])) : null;
}
@Override
public void process(ResponseBuilder rb) throws IOException {
// Do nothing -- the real work is modifying the input query

View File

@ -18,12 +18,14 @@
package org.apache.solr.servlet;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -35,9 +37,11 @@ import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.ConfigSolr;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.ContentStreamHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryRequestBase;
@ -60,6 +64,7 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -131,12 +136,44 @@ public class SolrDispatchFilter implements Filter
log.info("SolrDispatchFilter.init() done");
}
private ConfigSolr loadConfigSolr(SolrResourceLoader loader) {
String solrxmlLocation = System.getProperty("solr.solrxml.location", "solrhome");
if (solrxmlLocation == null || "solrhome".equalsIgnoreCase(solrxmlLocation))
return ConfigSolr.fromSolrHome(loader, loader.getInstanceDir());
if ("zookeeper".equalsIgnoreCase(solrxmlLocation)) {
String zkHost = System.getProperty("zkHost");
log.info("Trying to read solr.xml from " + zkHost);
if (StringUtils.isEmpty(zkHost))
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not load solr.xml from zookeeper: zkHost system property not set");
SolrZkClient zkClient = new SolrZkClient(zkHost, 30000);
try {
if (!zkClient.exists("/solr.xml", true))
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper: node not found");
byte[] data = zkClient.getData("/solr.xml", null, null, true);
return ConfigSolr.fromInputStream(loader, new ByteArrayInputStream(data));
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper", e);
} finally {
zkClient.close();
}
}
throw new SolrException(ErrorCode.SERVER_ERROR,
"Bad solr.solrxml.location set: " + solrxmlLocation + " - should be 'solrhome' or 'zookeeper'");
}
/**
* Override this to change CoreContainer initialization
* @return a CoreContainer to hold this server's cores
*/
protected CoreContainer createCoreContainer() {
CoreContainer cores = new CoreContainer();
SolrResourceLoader loader = new SolrResourceLoader(SolrResourceLoader.locateSolrHome());
ConfigSolr config = loadConfigSolr(loader);
CoreContainer cores = new CoreContainer(loader, config);
cores.load();
return cores;
}

View File

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- For testing, I need to create some custom directories on the fly, particularly for some of the new
discovery-based core configuration. Trying a minimal configuration to cut down the setup time.
use in conjunction with schema-minimal.xml perhaps? -->
<config>
<luceneMatchVersion>LUCENE_41</luceneMatchVersion>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<xi:include href="./solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<jmx/>
<updateHandler class="solr.DirectUpdateHandler2">
<!--updateLog>
<str name="dir">${solr.ulog.dir:}</str>
</updateLog-->
</updateHandler>
<query>
<enableLazyFieldLoading>true</enableLazyFieldLoading>
<queryResultWindowSize>20</queryResultWindowSize>
<queryResultMaxDocsCached>20</queryResultMaxDocsCached>
<useColdSearcher>true</useColdSearcher>
<maxWarmingSearchers>1</maxWarmingSearchers>
</query>
<requestHandler name="/admin/" class="solr.admin.AdminHandlers" />
<requestDispatcher handleSelect="false">
<httpCaching never304="true"/>
</requestDispatcher>
<requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">all</str>
<str name="df">text</str>
<str name="dummyParam">${solr.core.name}</str>
</lst>
</requestHandler>
<requestHandler name="/update" class="solr.UpdateRequestHandler">
</requestHandler>
<queryResponseWriter name="json" class="solr.JSONResponseWriter">
<!-- For the purposes of the tutorial, JSON responses are written as
plain text so that they are easy to read in *any* browser.
If you expect a MIME type of "application/json" just remove this override.
-->
<str name="content-type">text/plain; charset=UTF-8</str>
</queryResponseWriter>
</config>

View File

@ -30,7 +30,7 @@
-->
<cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}"
hostContext="${hostContext:solr}" zkClientTimeout="${solr.zkclienttimeout:30000}" numShards="${numShards:3}" shareSchema="${shareSchema:false}"
genericCoreNodeNames="${genericCoreNodeNames:true}"
genericCoreNodeNames="${genericCoreNodeNames:true}" leaderVoteWait="0"
distribUpdateConnTimeout="${distribUpdateConnTimeout:15000}" distribUpdateSoTimeout="${distribUpdateSoTimeout:120000}">
<core name="collection1" instanceDir="collection1" shard="${shard:}" collection="${collection:collection1}" config="${solrconfig:solrconfig.xml}" schema="${schema:schema.xml}"
coreNodeName="${coreNodeName:}"/>

View File

@ -17,12 +17,39 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util._TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer.RemoteSolrException;
@ -46,8 +73,10 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrCmdDistributor.Request;
@ -55,32 +84,6 @@ import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
/**
* Tests the Cloud Collections API.
*/
@ -500,6 +503,8 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
waitForRecoveriesToFinish("awholynewcollection_" + j, zkStateReader, false);
}
checkInstanceDirs(jettys.get(0));
List<String> collectionNameList = new ArrayList<String>();
collectionNameList.addAll(collectionInfos.keySet());
String collectionName = collectionNameList.get(random().nextInt(collectionNameList.size()));
@ -658,6 +663,24 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
checkNoTwoShardsUseTheSameIndexDir();
}
private void checkInstanceDirs(JettySolrRunner jetty) {
CoreContainer cores = ((SolrDispatchFilter) jetty.getDispatchFilter()
.getFilter()).getCores();
Collection<SolrCore> theCores = cores.getCores();
for (SolrCore core : theCores) {
// look for core props file
assertTrue("Could not find expected core.properties file",
new File((String) core.getStatistics().get("instanceDir"),
"core.properties").exists());
assertEquals(
SolrResourceLoader.normalizeDir(jetty.getSolrHome() + File.separator
+ core.getName()),
SolrResourceLoader.normalizeDir((String) core.getStatistics().get(
"instanceDir")));
}
}
private boolean waitForReloads(String collectionName, Map<String,Long> urlToTimeBefore) throws SolrServerException, IOException {

View File

@ -0,0 +1,233 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule;
import com.google.common.base.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.ConfigSolr;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
public class SolrXmlInZkTest extends SolrTestCaseJ4 {
@Rule
public TestRule solrTestRules = RuleChain.outerRule(new SystemPropertiesRestoreRule());
protected ZkTestServer zkServer;
protected String zkDir;
private SolrZkClient zkClient;
private ZkStateReader reader;
private static int PORT = 7000;
private ConfigSolr cfg;
@Before
public void beforeClass() {
System.setProperty("solr.solrxml.location", "zookeeper");
}
private void setUpZkAndDiskXml(boolean toZk, boolean leaveOnLocal) throws Exception {
createTempDir();
File solrHome = new File(dataDir, "home");
copyMinConf(new File(solrHome, "myCollect"));
if (leaveOnLocal) {
FileUtils.copyFile(new File(SolrTestCaseJ4.TEST_HOME(), "solr-stress-new.xml"), new File(solrHome, "solr.xml"));
}
System.setProperty("solr.solr.home", solrHome.getAbsolutePath());
ignoreException("No UpdateLog found - cannot sync");
ignoreException("No UpdateLog found - cannot recover");
System.setProperty("zkClientTimeout", "8000");
zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper" + System.currentTimeMillis() + "/server1/data";
zkServer = new ZkTestServer(zkDir);
zkServer.run();
System.setProperty("zkHost", zkServer.getZkAddress());
AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(),
zkServer.getZkAddress(), "solrconfig.xml", "schema.xml");
zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
if (toZk) {
zkClient.makePath("solr.xml", XML_FOR_ZK.getBytes(Charsets.UTF_8), true);
}
zkClient.close();
log.info("####SETUP_START " + getTestName());
// set some system properties for use by tests
System.setProperty("solr.test.sys.prop1", "propone");
System.setProperty("solr.test.sys.prop2", "proptwo");
Method method = SolrDispatchFilter.class.getDeclaredMethod("loadConfigSolr", SolrResourceLoader.class);
method.setAccessible(true);
Object obj = method.invoke(new SolrDispatchFilter(), new SolrResourceLoader(null));
cfg = (ConfigSolr) obj;
log.info("####SETUP_END " + getTestName());
}
private void closeZK() throws Exception {
if (zkClient != null) {
zkClient.close();
}
if (reader != null) {
reader.close();
}
zkServer.shutdown();
}
@Test
public void testXmlOnBoth() throws Exception {
try {
setUpZkAndDiskXml(true, true);
assertEquals("Should have gotten a new port the xml file sent to ZK, overrides the copy on disk",
cfg.getZkHostPort(), "9045");
} finally {
closeZK();
}
}
@Test
public void testXmlInZkOnly() throws Exception {
try {
setUpZkAndDiskXml(true, false);
assertEquals("Should have gotten a new port the xml file sent to ZK",
cfg.getZkHostPort(), "9045");
} finally {
closeZK();
}
}
@Test
public void testNotInZkAndShouldBe() throws Exception {
try {
setUpZkAndDiskXml(false, true);
fail("Should have gotten an exception here!");
} catch (InvocationTargetException ite) {
SolrException se = (SolrException) ite.getTargetException();
assertEquals("Should have an exception here, file not in ZK.",
"Could not load solr.xml from zookeeper", se.getMessage());
} finally {
closeZK();
}
}
// TODO: Solr 5.0. when we remove the default solr.xml from configSolrXmlOld this should start failing.
@Test
public void testNotInZkOrOnDisk() throws Exception {
try {
System.clearProperty("solr.solrxml.location");
System.setProperty("hostPort", "8787");
setUpZkAndDiskXml(false, false); // solr.xml not on disk either
assertEquals("Should have gotten the default port from the hard-coded default solr.xml file via sys prop.",
cfg.getZkHostPort(), "8787");
} finally {
closeZK();
}
}
@Test
public void testOnDiskOnly() throws Exception {
try {
System.clearProperty("solr.solrxml.location");
setUpZkAndDiskXml(false, true);
assertEquals("Should have gotten the default port", cfg.getZkHostPort(), "8983");
} finally {
closeZK();
}
}
@Test
public void testBadSysProp() throws Exception {
try {
System.setProperty("solr.solrxml.location", "solrHomeDir");
setUpZkAndDiskXml(false, true);
fail("Should have thrown exception in SolrXmlInZkTest.testBadSysProp");
} catch (InvocationTargetException ite) {
SolrException se = (SolrException) ite.getTargetException();
assertEquals("Should have an exception in SolrXmlInZkTest.testBadSysProp, sysprop set to bogus value.",
se.getMessage(), "Bad solr.solrxml.location set: solrHomeDir - should be 'solrhome' or 'zookeeper'");
} finally {
closeZK();
}
}
//SolrDispatchFilter.protected static ConfigSolr loadConfigSolr(SolrResourceLoader loader) {
@Test
public void testZkHostDiscovery() throws ClassNotFoundException, NoSuchMethodException,
IllegalAccessException, InstantiationException, InvocationTargetException {
// Should see an error when zkHost is not defined but solr.solrxml.location is set to zookeeper.
System.clearProperty("zkHost");
try {
Method method = SolrDispatchFilter.class.getDeclaredMethod("loadConfigSolr", SolrResourceLoader.class);
method.setAccessible(true);
method.invoke(new SolrDispatchFilter(), new SolrResourceLoader(null));
fail("Should have thrown an exception");
} catch (InvocationTargetException ite) {
assertTrue("Should be catching a SolrException", ite.getTargetException() instanceof SolrException);
String cause = ((SolrException) ite.getTargetException()).getMessage();
assertEquals("Caught Solr exception", cause,
"Could not load solr.xml from zookeeper: zkHost system property not set");
}
}
// Just a random port, I'm not going to use it but just check that the Solr instance constructed from the XML
// file in ZK overrides the default port.
private final String XML_FOR_ZK =
"<solr>" +
" <solrcloud>" +
" <str name=\"host\">127.0.0.1</str>" +
" <int name=\"hostPort\">9045</int>" +
" <str name=\"hostContext\">${hostContext:solr}</str>" +
" </solrcloud>" +
" <shardHandlerFactory name=\"shardHandlerFactory\" class=\"HttpShardHandlerFactory\">" +
" <int name=\"socketTimeout\">${socketTimeout:120000}</int>" +
" <int name=\"connTimeout\">${connTimeout:15000}</int>" +
" </shardHandlerFactory>" +
"</solr>";
}

View File

@ -18,10 +18,14 @@ package org.apache.solr.cloud;
*/
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.solr.SolrTestCaseJ4;
@ -50,7 +54,8 @@ public class ZkCLITest extends SolrTestCaseJ4 {
private String solrHome;
private SolrZkClient zkClient;
protected static final String SOLR_HOME = SolrTestCaseJ4.TEST_HOME();
@BeforeClass
public static void beforeClass() {
@ -141,7 +146,7 @@ public class ZkCLITest extends SolrTestCaseJ4 {
@Test
public void testPut() throws Exception {
// test bootstrap_conf
// test put
String data = "my data";
String[] args = new String[] {"-zkhost", zkServer.getZkAddress(), "-cmd",
"put", "/data.txt", data};
@ -151,7 +156,41 @@ public class ZkCLITest extends SolrTestCaseJ4 {
assertArrayEquals(zkClient.getData("/data.txt", null, null, true), data.getBytes("UTF-8"));
}
@Test
public void testPutFile() throws Exception {
// test put file
String[] args = new String[] {"-zkhost", zkServer.getZkAddress(), "-cmd",
"putfile", "/solr.xml", SOLR_HOME + File.separator + "solr-stress-new.xml"};
ZkCLI.main(args);
String fromZk = new String(zkClient.getData("/solr.xml", null, null, true), "UTF-8");
File locFile = new File(SOLR_HOME + File.separator + "solr-stress-new.xml");
InputStream is = new FileInputStream(locFile);
String fromLoc;
try {
fromLoc = new String(IOUtils.toByteArray(is), "UTF-8");
} finally {
IOUtils.closeQuietly(is);
}
assertEquals("Should get back what we put in ZK", fromZk, fromLoc);
}
@Test
public void testPutFileNotExists() throws Exception {
// test put file
String[] args = new String[] {"-zkhost", zkServer.getZkAddress(), "-cmd",
"putfile", "/solr.xml", SOLR_HOME + File.separator + "not-there.xml"};
try {
ZkCLI.main(args);
fail("Should have had a file not found exception");
} catch (FileNotFoundException fne) {
String msg = fne.getMessage();
assertTrue("Didn't find expected error message containing 'not-there.xml' in " + msg,
msg.indexOf("not-there.xml") != -1);
}
}
@Test
public void testList() throws Exception {
zkClient.makePath("/test", true);

View File

@ -0,0 +1,41 @@
package org.apache.solr.core;
import org.apache.solr.SolrTestCaseJ4;
import org.junit.Test;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public class TestImplicitCoreProperties extends SolrTestCaseJ4 {
public static final String SOLRXML =
"<solr><cores><core name=\"collection1\" instanceDir=\"collection1\" config=\"solrconfig-implicitproperties.xml\"/></cores></solr>";
@Test
public void testImplicitPropertiesAreSubstitutedInSolrConfig() {
CoreContainer cc = createCoreContainer(TEST_HOME(), SOLRXML);
try {
cc.load();
assertQ(req("q", "*:*"), "//str[@name='dummyParam'][.='collection1']");
}
finally {
cc.shutdown();
}
}
}

View File

@ -69,7 +69,7 @@ public class TestLazyCores extends SolrTestCaseJ4 {
FileUtils.write(solrXml, LOTS_SOLR_XML, IOUtils.CHARSET_UTF_8.toString());
ConfigSolrXmlOld config = (ConfigSolrXmlOld) ConfigSolr.fromFile(loader, solrXml);
CoresLocator locator = new SolrXMLCoresLocator.NonPersistingLocator(solrXml, LOTS_SOLR_XML, config);
CoresLocator locator = new SolrXMLCoresLocator.NonPersistingLocator(LOTS_SOLR_XML, config);
final CoreContainer cores = new CoreContainer(loader, config, locator);

View File

@ -35,7 +35,7 @@ public class TestSolrXmlPersistor {
final String solrxml = "<solr><cores adminHandler=\"/admin\"/></solr>";
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(new File("testfile.xml"), solrxml, null);
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(solrxml, null);
assertEquals(persistor.buildSolrXML(EMPTY_CD_LIST),
"<solr><cores adminHandler=\"/admin\"></cores></solr>");
@ -45,7 +45,7 @@ public class TestSolrXmlPersistor {
public void emptyCoresTagIsPersisted() {
final String solrxml = "<solr><cores adminHandler=\"/admin\"></cores></solr>";
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(new File("testfile.xml"), solrxml, null);
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(solrxml, null);
assertEquals(persistor.buildSolrXML(EMPTY_CD_LIST), "<solr><cores adminHandler=\"/admin\"></cores></solr>");
}
@ -53,7 +53,7 @@ public class TestSolrXmlPersistor {
public void emptySolrXmlIsPersisted() {
final String solrxml = "<solr></solr>";
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(new File("testfile.xml"), solrxml, null);
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(solrxml, null);
assertEquals(persistor.buildSolrXML(EMPTY_CD_LIST), "<solr><cores></cores></solr>");
}
@ -68,7 +68,7 @@ public class TestSolrXmlPersistor {
final CoreDescriptor cd = new CoreDescriptor(cc, "testcore", "instance/dir/");
List<CoreDescriptor> cds = ImmutableList.of(cd);
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(new File("testfile.xml"), solrxml, null);
SolrXMLCoresLocator persistor = new SolrXMLCoresLocator(solrxml, null);
assertEquals(persistor.buildSolrXML(cds),
"<solr><cores>" + SolrXMLCoresLocator.NEWLINE
+ " <core name=\"testcore\" instanceDir=\"instance/dir/\"/>" + SolrXMLCoresLocator.NEWLINE
@ -89,7 +89,7 @@ public class TestSolrXmlPersistor {
"</cores>" +
"</solr>";
SolrXMLCoresLocator locator = new SolrXMLCoresLocator(new File("testfile.xml"), solrxml, null);
SolrXMLCoresLocator locator = new SolrXMLCoresLocator(solrxml, null);
assertTrue(locator.getTemplate().contains("{{CORES_PLACEHOLDER}}"));
assertTrue(locator.getTemplate().contains("<shardHandlerFactory "));
assertTrue(locator.getTemplate().contains("${socketTimeout:500}"));
@ -107,15 +107,14 @@ public class TestSolrXmlPersistor {
"</cores>" +
"</solr>";
SolrXMLCoresLocator locator = new SolrXMLCoresLocator(new File("testfile.xml"), solrxml, null);
SolrXMLCoresLocator locator = new SolrXMLCoresLocator(solrxml, null);
assertTrue(locator.getTemplate().contains("{{CORES_PLACEHOLDER}}"));
assertTrue(locator.getTemplate().contains("<shardHandlerFactory "));
}
@Test
public void complexXmlIsParsed() {
SolrXMLCoresLocator locator = new SolrXMLCoresLocator(new File("testfile.xml"),
TestSolrXmlPersistence.SOLR_XML_LOTS_SYSVARS, null);
SolrXMLCoresLocator locator = new SolrXMLCoresLocator(TestSolrXmlPersistence.SOLR_XML_LOTS_SYSVARS, null);
assertTrue(locator.getTemplate().contains("{{CORES_PLACEHOLDER}}"));
}

View File

@ -22,6 +22,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.GroupParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.QueryElevationParams;
import org.apache.solr.util.FileUtils;
@ -105,6 +106,208 @@ public class QueryElevationComponentTest extends SolrTestCaseJ4 {
}
}
@Test
public void testGroupedQuery() throws Exception {
try {
init("schema11.xml");
clearIndex();
assertU(commit());
assertU(adoc("id", "1", "text", "XXXX XXXX", "str_s", "a"));
assertU(adoc("id", "2", "text", "XXXX AAAA", "str_s", "b"));
assertU(adoc("id", "3", "text", "ZZZZ", "str_s", "c"));
assertU(adoc("id", "4", "text", "XXXX ZZZZ", "str_s", "d"));
assertU(adoc("id", "5", "text", "ZZZZ ZZZZ", "str_s", "e"));
assertU(adoc("id", "6", "text", "AAAA AAAA AAAA", "str_s", "f"));
assertU(adoc("id", "7", "text", "AAAA AAAA ZZZZ", "str_s", "g"));
assertU(adoc("id", "8", "text", "XXXX", "str_s", "h"));
assertU(adoc("id", "9", "text", "YYYY ZZZZ", "str_s", "i"));
assertU(adoc("id", "22", "text", "XXXX ZZZZ AAAA", "str_s", "b"));
assertU(adoc("id", "66", "text", "XXXX ZZZZ AAAA", "str_s", "f"));
assertU(adoc("id", "77", "text", "XXXX ZZZZ AAAA", "str_s", "g"));
assertU(commit());
final String groups = "//arr[@name='groups']";
assertQ("non-elevated group query",
req(CommonParams.Q, "AAAA",
CommonParams.QT, "/elevate",
GroupParams.GROUP_FIELD, "str_s",
GroupParams.GROUP, "true",
GroupParams.GROUP_TOTAL_COUNT, "true",
GroupParams.GROUP_LIMIT, "100",
QueryElevationParams.ENABLE, "false",
CommonParams.FL, "id, score, [elevated]")
, "//*[@name='ngroups'][.='3']"
, "//*[@name='matches'][.='6']"
, groups +"/lst[1]//doc[1]/float[@name='id'][.='6.0']"
, groups +"/lst[1]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[1]//doc[2]/float[@name='id'][.='66.0']"
, groups +"/lst[1]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[1]/float[@name='id'][.='7.0']"
, groups +"/lst[2]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[2]/float[@name='id'][.='77.0']"
, groups +"/lst[2]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[1]/float[@name='id'][.='2.0']"
, groups +"/lst[3]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[2]/float[@name='id'][.='22.0']"
, groups +"/lst[3]//doc[2]/bool[@name='[elevated]'][.='false']"
);
assertQ("elevated group query",
req(CommonParams.Q, "AAAA",
CommonParams.QT, "/elevate",
GroupParams.GROUP_FIELD, "str_s",
GroupParams.GROUP, "true",
GroupParams.GROUP_TOTAL_COUNT, "true",
GroupParams.GROUP_LIMIT, "100",
CommonParams.FL, "id, score, [elevated]")
, "//*[@name='ngroups'][.='3']"
, "//*[@name='matches'][.='6']"
, groups +"/lst[1]//doc[1]/float[@name='id'][.='7.0']"
, groups +"/lst[1]//doc[1]/bool[@name='[elevated]'][.='true']"
, groups +"/lst[1]//doc[2]/float[@name='id'][.='77.0']"
, groups +"/lst[1]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[1]/float[@name='id'][.='6.0']"
, groups +"/lst[2]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[2]/float[@name='id'][.='66.0']"
, groups +"/lst[2]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[1]/float[@name='id'][.='2.0']"
, groups +"/lst[3]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[2]/float[@name='id'][.='22.0']"
, groups +"/lst[3]//doc[2]/bool[@name='[elevated]'][.='false']"
);
assertQ("non-elevated because sorted group query",
req(CommonParams.Q, "AAAA",
CommonParams.QT, "/elevate",
CommonParams.SORT, "id asc",
GroupParams.GROUP_FIELD, "str_s",
GroupParams.GROUP, "true",
GroupParams.GROUP_TOTAL_COUNT, "true",
GroupParams.GROUP_LIMIT, "100",
CommonParams.FL, "id, score, [elevated]")
, "//*[@name='ngroups'][.='3']"
, "//*[@name='matches'][.='6']"
, groups +"/lst[1]//doc[1]/float[@name='id'][.='2.0']"
, groups +"/lst[1]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[1]//doc[2]/float[@name='id'][.='22.0']"
, groups +"/lst[1]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[1]/float[@name='id'][.='6.0']"
, groups +"/lst[2]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[2]/float[@name='id'][.='66.0']"
, groups +"/lst[2]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[1]/float[@name='id'][.='7.0']"
, groups +"/lst[3]//doc[1]/bool[@name='[elevated]'][.='true']"
, groups +"/lst[3]//doc[2]/float[@name='id'][.='77.0']"
, groups +"/lst[3]//doc[2]/bool[@name='[elevated]'][.='false']"
);
assertQ("force-elevated sorted group query",
req(CommonParams.Q, "AAAA",
CommonParams.QT, "/elevate",
CommonParams.SORT, "id asc",
QueryElevationParams.FORCE_ELEVATION, "true",
GroupParams.GROUP_FIELD, "str_s",
GroupParams.GROUP, "true",
GroupParams.GROUP_TOTAL_COUNT, "true",
GroupParams.GROUP_LIMIT, "100",
CommonParams.FL, "id, score, [elevated]")
, "//*[@name='ngroups'][.='3']"
, "//*[@name='matches'][.='6']"
, groups +"/lst[1]//doc[1]/float[@name='id'][.='7.0']"
, groups +"/lst[1]//doc[1]/bool[@name='[elevated]'][.='true']"
, groups +"/lst[1]//doc[2]/float[@name='id'][.='77.0']"
, groups +"/lst[1]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[1]/float[@name='id'][.='2.0']"
, groups +"/lst[2]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[2]/float[@name='id'][.='22.0']"
, groups +"/lst[2]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[1]/float[@name='id'][.='6.0']"
, groups +"/lst[3]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[2]/float[@name='id'][.='66.0']"
, groups +"/lst[3]//doc[2]/bool[@name='[elevated]'][.='false']"
);
assertQ("non-elevated because of sort within group query",
req(CommonParams.Q, "AAAA",
CommonParams.QT, "/elevate",
CommonParams.SORT, "id asc",
GroupParams.GROUP_SORT, "id desc",
GroupParams.GROUP_FIELD, "str_s",
GroupParams.GROUP, "true",
GroupParams.GROUP_TOTAL_COUNT, "true",
GroupParams.GROUP_LIMIT, "100",
CommonParams.FL, "id, score, [elevated]")
, "//*[@name='ngroups'][.='3']"
, "//*[@name='matches'][.='6']"
, groups +"/lst[1]//doc[1]/float[@name='id'][.='22.0']"
, groups +"/lst[1]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[1]//doc[2]/float[@name='id'][.='2.0']"
, groups +"/lst[1]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[1]/float[@name='id'][.='66.0']"
, groups +"/lst[2]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[2]/float[@name='id'][.='6.0']"
, groups +"/lst[2]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[1]/float[@name='id'][.='77.0']"
, groups +"/lst[3]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[2]/float[@name='id'][.='7.0']"
, groups +"/lst[3]//doc[2]/bool[@name='[elevated]'][.='true']"
);
assertQ("force elevated sort within sorted group query",
req(CommonParams.Q, "AAAA",
CommonParams.QT, "/elevate",
CommonParams.SORT, "id asc",
GroupParams.GROUP_SORT, "id desc",
QueryElevationParams.FORCE_ELEVATION, "true",
GroupParams.GROUP_FIELD, "str_s",
GroupParams.GROUP, "true",
GroupParams.GROUP_TOTAL_COUNT, "true",
GroupParams.GROUP_LIMIT, "100",
CommonParams.FL, "id, score, [elevated]")
, "//*[@name='ngroups'][.='3']"
, "//*[@name='matches'][.='6']"
, groups +"/lst[1]//doc[1]/float[@name='id'][.='7.0']"
, groups +"/lst[1]//doc[1]/bool[@name='[elevated]'][.='true']"
, groups +"/lst[1]//doc[2]/float[@name='id'][.='77.0']"
, groups +"/lst[1]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[1]/float[@name='id'][.='22.0']"
, groups +"/lst[2]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[2]//doc[2]/float[@name='id'][.='2.0']"
, groups +"/lst[2]//doc[2]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[1]/float[@name='id'][.='66.0']"
, groups +"/lst[3]//doc[1]/bool[@name='[elevated]'][.='false']"
, groups +"/lst[3]//doc[2]/float[@name='id'][.='6.0']"
, groups +"/lst[3]//doc[2]/bool[@name='[elevated]'][.='false']"
);
} finally {
delete();
}
}
@Test
public void testTrieFieldType() throws Exception {
try {

View File

@ -25,6 +25,7 @@ import org.apache.solr.search.SolrCache;
import org.apache.solr.search.SyntaxError;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Ignore;
import java.io.IOException;
import java.util.ArrayList;
@ -49,13 +50,21 @@ public class BJQParserTest extends SolrTestCaseJ4 {
int i = 0;
List<List<String[]>> blocks = createBlocks();
for (List<String[]> block : blocks) {
List<XmlDoc> updBlock = new ArrayList<>();
for (String[] doc : block) {
String[] idDoc = Arrays.copyOf(doc,doc.length+2);
idDoc[doc.length]="id";
idDoc[doc.length+1]=Integer.toString(i);
assertU(add(doc(idDoc)));
updBlock.add(doc(idDoc));
i++;
}
//got xmls for every doc. now nest all into the last one
XmlDoc parentDoc = updBlock.get(updBlock.size()-1);
parentDoc.xml = parentDoc.xml.replace("</doc>",
updBlock.subList(0, updBlock.size()-1).toString().replaceAll("[\\[\\]]","")+"</doc>");
assertU(add(parentDoc));
if (random().nextBoolean()) {
assertU(commit());
// force empty segment (actually, this will no longer create an empty segment, only a new segments_n)
@ -184,6 +193,7 @@ public class BJQParserTest extends SolrTestCaseJ4 {
}
@Test
@Ignore("SOLR-5168")
public void testGrandChildren() throws IOException {
assertQ(
req("q", "{!parent which=$parentfilter v=$children}", "children",

View File

@ -17,8 +17,10 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import org.apache.commons.io.FilenameUtils;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
@ -44,6 +46,7 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrResourceLoader;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -455,14 +458,45 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
public JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) throws Exception {
JettySolrRunner jetty = new JettySolrRunner(solrHome.getAbsolutePath(), context, 0, solrConfigOverride, schemaOverride, false, getExtraServlets());
// randomly test a relative solr.home path
if (random().nextBoolean()) {
solrHome = getRelativeSolrHomePath(solrHome);
}
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context, 0, solrConfigOverride, schemaOverride, false, getExtraServlets());
jetty.setShards(shardList);
jetty.setDataDir(getDataDir(dataDir));
jetty.start();
return jetty;
}
private File getRelativeSolrHomePath(File solrHome) {
String path = SolrResourceLoader.normalizeDir(new File(".").getAbsolutePath());
String base = new File(solrHome.getPath()).getAbsolutePath();
if (base.startsWith("."));
base.replaceFirst("\\.", new File(".").getName());
if (path.endsWith(File.separator + ".")) {
path = path.substring(0, path.length() - 2);
}
int splits = path.split("\\" + File.separator).length;
StringBuilder p = new StringBuilder();
for (int i = 0; i < splits - 2; i++) {
p.append(".." + File.separator);
}
String prefix = FilenameUtils.getPrefix(path);
if (base.startsWith(prefix)) {
base = base.substring(prefix.length());
}
solrHome = new File(p.toString() + base);
return solrHome;
}
protected void updateMappingsFromZk(List<JettySolrRunner> jettys,
List<SolrServer> clients) throws Exception {

View File

@ -21,6 +21,9 @@ import org.apache.lucene.index.*;
import org.apache.lucene.index.MergePolicy.MergeSpecification;
import org.apache.lucene.util.LuceneTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.io.IOException;
@ -32,6 +35,8 @@ import java.io.IOException;
*/
public final class RandomMergePolicy extends MergePolicy {
public static Logger log = LoggerFactory.getLogger(RandomMergePolicy.class);
/**
* Not private so tests can inspect it,
* Not final so it can be set on clone
@ -45,6 +50,8 @@ public final class RandomMergePolicy extends MergePolicy {
super(inner.getNoCFSRatio(),
(long) (inner.getMaxCFSSegmentSizeMB() * 1024 * 1024));
this.inner = inner;
log.info("RandomMergePolicy wrapping {}: {}",
inner.getClass(), inner);
}
public RandomMergePolicy clone() {

View File

@ -41,6 +41,8 @@ limitations under the License.
<link rel="stylesheet" type="text/css" href="css/styles/schema-browser.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/styles/threads.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/chosen.css?_=${version}">
<meta http-equiv="x-ua-compatible" content="IE=9">
<script type="text/javascript">

View File

@ -552,7 +552,7 @@ var solr_admin = function( app_config )
json_str = JSON.stringify( JSON.parse( json_str ), undefined, 2 );
}
return json_str;
return json_str.esc();
};
this.format_number = function format_number( number )