LUCENE-3692: DocumentsWriter blocks flushes when applyDeletes takes forever

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1232017 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2012-01-16 15:07:04 +00:00
parent 98e59fceee
commit ed4894dd8f
2 changed files with 229 additions and 104 deletions

View File

@ -20,13 +20,12 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
@ -117,7 +116,7 @@ final class DocumentsWriter {
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
private final TicketQueue ticketQueue = new TicketQueue();
private final DocumentsWriterFlushQueue ticketQueue = new DocumentsWriterFlushQueue();
/*
* we preserve changes during a full flush since IW might not checkout before
* we release all changes. NRT Readers otherwise suddenly return true from
@ -177,12 +176,7 @@ final class DocumentsWriter {
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null && !flushControl.isFullFlush()) {
synchronized (ticketQueue) {
ticketQueue.incTicketCount();// first inc the ticket count - freeze opens a window for #anyChanges to fail
// Freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
applyFlushTickets();
}
ticketQueue.addDeletesAndPurge(this, deleteQueue);
}
indexWriter.applyAllDeletes();
indexWriter.flushCount.incrementAndGet();
@ -401,7 +395,7 @@ final class DocumentsWriter {
while (flushingDWPT != null) {
maybeMerge = true;
boolean success = false;
FlushTicket ticket = null;
SegmentFlushTicket ticket = null;
try {
assert currentFullFlushDelQueue == null
|| flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@ -422,34 +416,27 @@ final class DocumentsWriter {
* might miss to deletes documents in 'A'.
*/
try {
synchronized (ticketQueue) {
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
ticketQueue.incrementAndAdd(ticket);
}
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush();
synchronized (ticketQueue) {
ticket.segment = newSegment;
}
ticketQueue.addSegment(ticket, newSegment);
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
success = true;
} finally {
if (!success && ticket != null) {
synchronized (ticketQueue) {
// In the case of a failure make sure we are making progress and
// apply all the deletes since the segment flush failed since the flush
// ticket could hold global deletes see FlushTicket#canPublish()
ticket.isSegmentFlush = false;
}
// In the case of a failure make sure we are making progress and
// apply all the deletes since the segment flush failed since the flush
// ticket could hold global deletes see FlushTicket#canPublish()
ticketQueue.markTicketFailed(ticket);
}
}
/*
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
applyFlushTickets();
ticketQueue.tryPurge(this);
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
@ -476,25 +463,7 @@ final class DocumentsWriter {
return maybeMerge;
}
private void applyFlushTickets() throws IOException {
synchronized (ticketQueue) {
while (true) {
// Keep publishing eligible flushed segments:
final FlushTicket head = ticketQueue.peek();
if (head != null && head.canPublish()) {
try {
finishFlush(head.segment, head.frozenDeletes);
} finally {
ticketQueue.poll();
}
} else {
break;
}
}
}
}
private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
@ -590,13 +559,11 @@ final class DocumentsWriter {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
synchronized (ticketQueue) {
ticketQueue.incTicketCount(); // first inc the ticket count - freeze opens a window for #anyChanges to fail
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
applyFlushTickets();
ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue);
} else {
ticketQueue.forcePurge(this);
}
assert !flushingDeleteQueue.anyChanges();
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
}
@ -621,61 +588,8 @@ final class DocumentsWriter {
}
static final class FlushTicket {
final FrozenBufferedDeletes frozenDeletes;
/* access to non-final members must be synchronized on DW#ticketQueue */
FlushedSegment segment;
boolean isSegmentFlush;
FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
this.frozenDeletes = frozenDeletes;
this.isSegmentFlush = isSegmentFlush;
}
boolean canPublish() {
return (!isSegmentFlush || segment != null);
}
}
static final class TicketQueue {
private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
final AtomicInteger ticketCount = new AtomicInteger();
void incTicketCount() {
ticketCount.incrementAndGet();
}
public boolean hasTickets() {
assert ticketCount.get() >= 0;
return ticketCount.get() != 0;
}
void incrementAndAdd(FlushTicket ticket) {
incTicketCount();
add(ticket);
}
void add(FlushTicket ticket) {
queue.add(ticket);
}
FlushTicket peek() {
return queue.peek();
}
FlushTicket poll() {
try {
return queue.poll();
} finally {
ticketCount.decrementAndGet();
}
}
void clear() {
queue.clear();
ticketCount.set(0);
}
}
// use by IW during close to assert all DWPT are inactive after final flush
boolean assertNoActiveDWPT() {

View File

@ -0,0 +1,211 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
/**
* @lucene.internal
*/
public class DocumentsWriterFlushQueue {
private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
// we track tickets separately since count must be present even before the ticket is
// constructed ie. queue.size would not reflect it.
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
synchronized void addDeletesAndPurge(DocumentsWriter writer,
DocumentsWriterDeleteQueue deleteQueue) throws IOException {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
boolean success = false;
try {
queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
success = true;
} finally {
if (!success) {
decTickets();
}
}
forcePurge(writer);
}
private void incTickets() {
int numTickets = ticketCount.incrementAndGet();
assert numTickets > 0;
}
private void decTickets() {
int numTickets = ticketCount.decrementAndGet();
assert numTickets >= 0;
}
synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
incTickets();
boolean success = false;
try {
// prepare flush freezes the global deletes - do in synced block!
final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
queue.add(ticket);
success = true;
return ticket;
} finally {
if (!success) {
decTickets();
}
}
}
synchronized void addSegment(SegmentFlushTicket ticket, FlushedSegment segment) {
// the actual flush is done asynchronously and once done the FlushedSegment
// is passed to the flush ticket
ticket.setSegment(segment);
}
synchronized void markTicketFailed(SegmentFlushTicket ticket) {
// to free the queue we mark tickets as failed just to clean up the queue.
ticket.setFailed();
}
boolean hasTickets() {
assert ticketCount.get() >= 0 : "ticketCount should be >= 0 but was: " + ticketCount.get();
return ticketCount.get() != 0;
}
private void innerPurge(DocumentsWriter writer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
while (true) {
final FlushTicket head;
final boolean canPublish;
synchronized (this) {
head = queue.peek();
canPublish = head != null && head.canPublish(); // do this synced
}
if (canPublish) {
try {
/*
* if we bock on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since ther could
* be a ticket still in the queue.
*/
head.publish(writer);
} finally {
synchronized (this) {
// finally remove the publised ticket from the queue
final FlushTicket poll = queue.poll();
ticketCount.decrementAndGet();
assert poll == head;
}
}
} else {
break;
}
}
}
void forcePurge(DocumentsWriter writer) throws IOException {
purgeLock.lock();
try {
innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
void tryPurge(DocumentsWriter writer) throws IOException {
if (purgeLock.tryLock()) {
try {
innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
}
synchronized void clear() {
queue.clear();
ticketCount.set(0);
}
static abstract class FlushTicket {
protected FrozenBufferedDeletes frozenDeletes;
protected boolean published = false;
protected FlushTicket(FrozenBufferedDeletes frozenDeletes) {
assert frozenDeletes != null;
this.frozenDeletes = frozenDeletes;
}
protected abstract void publish(DocumentsWriter writer) throws IOException;
protected abstract boolean canPublish();
}
static final class GlobalDeletesTicket extends FlushTicket{
protected GlobalDeletesTicket(FrozenBufferedDeletes frozenDeletes) {
super(frozenDeletes);
}
protected void publish(DocumentsWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
// its a global ticket - no segment to publish
writer.finishFlush(null, frozenDeletes);
}
protected boolean canPublish() {
return true;
}
}
static final class SegmentFlushTicket extends FlushTicket {
private FlushedSegment segment;
private boolean failed = false;
protected SegmentFlushTicket(FrozenBufferedDeletes frozenDeletes) {
super(frozenDeletes);
}
protected void publish(DocumentsWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
writer.finishFlush(segment, frozenDeletes);
}
protected void setSegment(FlushedSegment segment) {
assert !failed;
this.segment = segment;
}
protected void setFailed() {
assert segment == null;
failed = true;
}
protected boolean canPublish() {
return segment != null || failed;
}
}
}