From ed4894dd8fb751fc4a287636a60d4c71f6ebb02b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 16 Jan 2012 15:07:04 +0000 Subject: [PATCH] LUCENE-3692: DocumentsWriter blocks flushes when applyDeletes takes forever git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1232017 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/lucene/index/DocumentsWriter.java | 122 ++-------- .../index/DocumentsWriterFlushQueue.java | 211 ++++++++++++++++++ 2 files changed, 229 insertions(+), 104 deletions(-) create mode 100644 lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java index a084e03e78d..10f5b89b3a8 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -20,13 +20,12 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; @@ -117,7 +116,7 @@ final class DocumentsWriter { // TODO: cut over to BytesRefHash in BufferedDeletes volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue(); - private final TicketQueue ticketQueue = new TicketQueue(); + private final DocumentsWriterFlushQueue ticketQueue = new DocumentsWriterFlushQueue(); /* * we preserve changes during a full flush since IW might not checkout before * we release all changes. NRT Readers otherwise suddenly return true from @@ -177,12 +176,7 @@ final class DocumentsWriter { private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { if (deleteQueue != null && !flushControl.isFullFlush()) { - synchronized (ticketQueue) { - ticketQueue.incTicketCount();// first inc the ticket count - freeze opens a window for #anyChanges to fail - // Freeze and insert the delete flush ticket in the queue - ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false)); - applyFlushTickets(); - } + ticketQueue.addDeletesAndPurge(this, deleteQueue); } indexWriter.applyAllDeletes(); indexWriter.flushCount.incrementAndGet(); @@ -401,7 +395,7 @@ final class DocumentsWriter { while (flushingDWPT != null) { maybeMerge = true; boolean success = false; - FlushTicket ticket = null; + SegmentFlushTicket ticket = null; try { assert currentFullFlushDelQueue == null || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: " @@ -422,34 +416,27 @@ final class DocumentsWriter { * might miss to deletes documents in 'A'. */ try { - synchronized (ticketQueue) { - // Each flush is assigned a ticket in the order they acquire the ticketQueue lock - ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); - ticketQueue.incrementAndAdd(ticket); - } + // Each flush is assigned a ticket in the order they acquire the ticketQueue lock + ticket = ticketQueue.addFlushTicket(flushingDWPT); // flush concurrently without locking final FlushedSegment newSegment = flushingDWPT.flush(); - synchronized (ticketQueue) { - ticket.segment = newSegment; - } + ticketQueue.addSegment(ticket, newSegment); // flush was successful once we reached this point - new seg. has been assigned to the ticket! success = true; } finally { if (!success && ticket != null) { - synchronized (ticketQueue) { - // In the case of a failure make sure we are making progress and - // apply all the deletes since the segment flush failed since the flush - // ticket could hold global deletes see FlushTicket#canPublish() - ticket.isSegmentFlush = false; - } + // In the case of a failure make sure we are making progress and + // apply all the deletes since the segment flush failed since the flush + // ticket could hold global deletes see FlushTicket#canPublish() + ticketQueue.markTicketFailed(ticket); } } /* * Now we are done and try to flush the ticket queue if the head of the * queue has already finished the flush. */ - applyFlushTickets(); + ticketQueue.tryPurge(this); } finally { flushControl.doAfterFlush(flushingDWPT); flushingDWPT.checkAndResetHasAborted(); @@ -476,25 +463,7 @@ final class DocumentsWriter { return maybeMerge; } - private void applyFlushTickets() throws IOException { - synchronized (ticketQueue) { - while (true) { - // Keep publishing eligible flushed segments: - final FlushTicket head = ticketQueue.peek(); - if (head != null && head.canPublish()) { - try { - finishFlush(head.segment, head.frozenDeletes); - } finally { - ticketQueue.poll(); - } - } else { - break; - } - } - } - } - - private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) + void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) throws IOException { // Finish the flushed segment and publish it to IndexWriter if (newSegment == null) { @@ -590,13 +559,11 @@ final class DocumentsWriter { if (infoStream.isEnabled("DW")) { infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); } - synchronized (ticketQueue) { - ticketQueue.incTicketCount(); // first inc the ticket count - freeze opens a window for #anyChanges to fail - ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false)); - } - applyFlushTickets(); + ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue); + } else { + ticketQueue.forcePurge(this); } - assert !flushingDeleteQueue.anyChanges(); + assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets(); } finally { assert flushingDeleteQueue == currentFullFlushDelQueue; } @@ -621,61 +588,8 @@ final class DocumentsWriter { } - static final class FlushTicket { - final FrozenBufferedDeletes frozenDeletes; - /* access to non-final members must be synchronized on DW#ticketQueue */ - FlushedSegment segment; - boolean isSegmentFlush; - - FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) { - this.frozenDeletes = frozenDeletes; - this.isSegmentFlush = isSegmentFlush; - } - - boolean canPublish() { - return (!isSegmentFlush || segment != null); - } - } - static final class TicketQueue { - private final Queue queue = new LinkedList(); - final AtomicInteger ticketCount = new AtomicInteger(); - - void incTicketCount() { - ticketCount.incrementAndGet(); - } - - public boolean hasTickets() { - assert ticketCount.get() >= 0; - return ticketCount.get() != 0; - } - - void incrementAndAdd(FlushTicket ticket) { - incTicketCount(); - add(ticket); - } - - void add(FlushTicket ticket) { - queue.add(ticket); - } - - FlushTicket peek() { - return queue.peek(); - } - - FlushTicket poll() { - try { - return queue.poll(); - } finally { - ticketCount.decrementAndGet(); - } - } - - void clear() { - queue.clear(); - ticketCount.set(0); - } - } + // use by IW during close to assert all DWPT are inactive after final flush boolean assertNoActiveDWPT() { diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java new file mode 100644 index 00000000000..73d62505327 --- /dev/null +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java @@ -0,0 +1,211 @@ +package org.apache.lucene.index; +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; + + +/** + * @lucene.internal + */ +public class DocumentsWriterFlushQueue { + private final Queue queue = new LinkedList(); + // we track tickets separately since count must be present even before the ticket is + // constructed ie. queue.size would not reflect it. + private final AtomicInteger ticketCount = new AtomicInteger(); + private final ReentrantLock purgeLock = new ReentrantLock(); + + synchronized void addDeletesAndPurge(DocumentsWriter writer, + DocumentsWriterDeleteQueue deleteQueue) throws IOException { + incTickets();// first inc the ticket count - freeze opens + // a window for #anyChanges to fail + boolean success = false; + try { + queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null))); + success = true; + } finally { + if (!success) { + decTickets(); + } + } + forcePurge(writer); + } + + private void incTickets() { + int numTickets = ticketCount.incrementAndGet(); + assert numTickets > 0; + } + + private void decTickets() { + int numTickets = ticketCount.decrementAndGet(); + assert numTickets >= 0; + } + + synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) { + // Each flush is assigned a ticket in the order they acquire the ticketQueue + // lock + incTickets(); + boolean success = false; + try { + // prepare flush freezes the global deletes - do in synced block! + final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush()); + queue.add(ticket); + success = true; + return ticket; + } finally { + if (!success) { + decTickets(); + } + } + } + + synchronized void addSegment(SegmentFlushTicket ticket, FlushedSegment segment) { + // the actual flush is done asynchronously and once done the FlushedSegment + // is passed to the flush ticket + ticket.setSegment(segment); + } + + synchronized void markTicketFailed(SegmentFlushTicket ticket) { + // to free the queue we mark tickets as failed just to clean up the queue. + ticket.setFailed(); + } + + boolean hasTickets() { + assert ticketCount.get() >= 0 : "ticketCount should be >= 0 but was: " + ticketCount.get(); + return ticketCount.get() != 0; + } + + private void innerPurge(DocumentsWriter writer) throws IOException { + assert purgeLock.isHeldByCurrentThread(); + while (true) { + final FlushTicket head; + final boolean canPublish; + synchronized (this) { + head = queue.peek(); + canPublish = head != null && head.canPublish(); // do this synced + } + if (canPublish) { + try { + /* + * if we bock on publish -> lock IW -> lock BufferedDeletes we don't block + * concurrent segment flushes just because they want to append to the queue. + * the downside is that we need to force a purge on fullFlush since ther could + * be a ticket still in the queue. + */ + head.publish(writer); + } finally { + synchronized (this) { + // finally remove the publised ticket from the queue + final FlushTicket poll = queue.poll(); + ticketCount.decrementAndGet(); + assert poll == head; + } + } + } else { + break; + } + } + } + + void forcePurge(DocumentsWriter writer) throws IOException { + purgeLock.lock(); + try { + innerPurge(writer); + } finally { + purgeLock.unlock(); + } + } + + void tryPurge(DocumentsWriter writer) throws IOException { + if (purgeLock.tryLock()) { + try { + innerPurge(writer); + } finally { + purgeLock.unlock(); + } + } + } + + synchronized void clear() { + queue.clear(); + ticketCount.set(0); + } + + static abstract class FlushTicket { + protected FrozenBufferedDeletes frozenDeletes; + protected boolean published = false; + + protected FlushTicket(FrozenBufferedDeletes frozenDeletes) { + assert frozenDeletes != null; + this.frozenDeletes = frozenDeletes; + } + + protected abstract void publish(DocumentsWriter writer) throws IOException; + protected abstract boolean canPublish(); + } + + static final class GlobalDeletesTicket extends FlushTicket{ + + protected GlobalDeletesTicket(FrozenBufferedDeletes frozenDeletes) { + super(frozenDeletes); + } + protected void publish(DocumentsWriter writer) throws IOException { + assert !published : "ticket was already publised - can not publish twice"; + published = true; + // its a global ticket - no segment to publish + writer.finishFlush(null, frozenDeletes); + } + + protected boolean canPublish() { + return true; + } + } + + static final class SegmentFlushTicket extends FlushTicket { + private FlushedSegment segment; + private boolean failed = false; + + protected SegmentFlushTicket(FrozenBufferedDeletes frozenDeletes) { + super(frozenDeletes); + } + + protected void publish(DocumentsWriter writer) throws IOException { + assert !published : "ticket was already publised - can not publish twice"; + published = true; + writer.finishFlush(segment, frozenDeletes); + } + + protected void setSegment(FlushedSegment segment) { + assert !failed; + this.segment = segment; + } + + protected void setFailed() { + assert segment == null; + failed = true; + } + + protected boolean canPublish() { + return segment != null || failed; + } + } +} \ No newline at end of file