diff --git a/CHANGES.txt b/CHANGES.txt index 76ddec75d87..c39fd07253d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -120,6 +120,10 @@ Bug fixes was set has no effect - it is masked by the similarity of the MultiSearcher. This is as designed, because MultiSearcher operates on Searchables (not Searchers). (Doron Cohen) +15. LUCENE-880: Fixed DocumentWriter to close the TokenStreams after it + has written the postings. Then the resources associated with the + TokenStreams can safely be released. (Michael Busch) + New features 1. LUCENE-759: Added two n-gram-producing TokenFilters. diff --git a/src/java/org/apache/lucene/index/DocumentWriter.java b/src/java/org/apache/lucene/index/DocumentWriter.java index 45ed02f7170..d95bf7fbe6f 100644 --- a/src/java/org/apache/lucene/index/DocumentWriter.java +++ b/src/java/org/apache/lucene/index/DocumentWriter.java @@ -35,6 +35,8 @@ import java.util.BitSet; import java.util.Enumeration; import java.util.Hashtable; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; final class DocumentWriter { private Analyzer analyzer; @@ -84,46 +86,67 @@ final class DocumentWriter { fieldBoosts = new float[fieldInfos.size()]; // init fieldBoosts Arrays.fill(fieldBoosts, doc.getBoost()); - // Before we write the FieldInfos we invert the Document. The reason is that - // during invertion the TokenStreams of tokenized fields are being processed - // and we might encounter tokens that have payloads associated with them. In - // this case we have to update the FieldInfo of the particular field. - invertDocument(doc); - - // sort postingTable into an array - Posting[] postings = sortPostingTable(); - - // write field infos - fieldInfos.write(directory, segment + ".fnm"); - - // write field values - FieldsWriter fieldsWriter = - new FieldsWriter(directory, segment, fieldInfos); try { - fieldsWriter.addDocument(doc); - } finally { - fieldsWriter.close(); - } - /* - for (int i = 0; i < postings.length; i++) { - Posting posting = postings[i]; - System.out.print(posting.term); - System.out.print(" freq=" + posting.freq); - System.out.print(" pos="); - System.out.print(posting.positions[0]); - for (int j = 1; j < posting.freq; j++) - System.out.print("," + posting.positions[j]); - System.out.println(""); + // Before we write the FieldInfos we invert the Document. The reason is that + // during invertion the TokenStreams of tokenized fields are being processed + // and we might encounter tokens that have payloads associated with them. In + // this case we have to update the FieldInfo of the particular field. + invertDocument(doc); + + // sort postingTable into an array + Posting[] postings = sortPostingTable(); + + // write field infos + fieldInfos.write(directory, segment + ".fnm"); + + // write field values + FieldsWriter fieldsWriter = + new FieldsWriter(directory, segment, fieldInfos); + try { + fieldsWriter.addDocument(doc); + } finally { + fieldsWriter.close(); + } + + /* + for (int i = 0; i < postings.length; i++) { + Posting posting = postings[i]; + System.out.print(posting.term); + System.out.print(" freq=" + posting.freq); + System.out.print(" pos="); + System.out.print(posting.positions[0]); + for (int j = 1; j < posting.freq; j++) + System.out.print("," + posting.positions[j]); + System.out.println(""); + } + */ + + // write postings + writePostings(postings, segment); + + // write norms of indexed fields + writeNorms(segment); + } finally { + // close TokenStreams + IOException ex = null; + + Iterator it = openTokenStreams.iterator(); + while (it.hasNext()) { + try { + ((TokenStream) it.next()).close(); + } catch (IOException e) { + if (ex != null) { + ex = e; + } + } + } + openTokenStreams.clear(); + + if (ex != null) { + throw ex; + } } - */ - - // write postings - writePostings(postings, segment); - - // write norms of indexed fields - writeNorms(segment); - } // Keys are Terms, values are Postings. @@ -137,6 +160,10 @@ final class DocumentWriter { // If any of the tokens of a paticular field carry a payload // then we enable payloads for that field. private BitSet fieldStoresPayloads; + + // Keep references of the token streams. We must close them after + // the postings are written to the segment. + private List openTokenStreams = new LinkedList(); // Tokenizes the fields of a document into Postings. private final void invertDocument(Document doc) @@ -181,42 +208,41 @@ final class DocumentWriter { stream = analyzer.tokenStream(fieldName, reader); } + // remember this TokenStream, we must close it later + openTokenStreams.add(stream); + // reset the TokenStream to the first token stream.reset(); - try { - Token lastToken = null; - for (Token t = stream.next(); t != null; t = stream.next()) { - position += (t.getPositionIncrement() - 1); + + Token lastToken = null; + for (Token t = stream.next(); t != null; t = stream.next()) { + position += (t.getPositionIncrement() - 1); - Payload payload = t.getPayload(); - if (payload != null) { - // enable payloads for this field - fieldStoresPayloads.set(fieldNumber); - } - - TermVectorOffsetInfo termVectorOffsetInfo; - if (field.isStoreOffsetWithTermVector()) { - termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset()); - } else { - termVectorOffsetInfo = null; - } - addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo); - - lastToken = t; - if (++length >= maxFieldLength) { - if (infoStream != null) - infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens"); - break; - } + Payload payload = t.getPayload(); + if (payload != null) { + // enable payloads for this field + fieldStoresPayloads.set(fieldNumber); + } + + TermVectorOffsetInfo termVectorOffsetInfo; + if (field.isStoreOffsetWithTermVector()) { + termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset()); + } else { + termVectorOffsetInfo = null; + } + addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo); + + lastToken = t; + if (++length >= maxFieldLength) { + if (infoStream != null) + infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens"); + break; } - - if(lastToken != null) - offset += lastToken.endOffset() + 1; - - } finally { - stream.close(); } + + if(lastToken != null) + offset += lastToken.endOffset() + 1; } fieldLengths[fieldNumber] = length; // save field length diff --git a/src/test/org/apache/lucene/index/TestPayloads.java b/src/test/org/apache/lucene/index/TestPayloads.java index 203168b48cd..adac6716482 100644 --- a/src/test/org/apache/lucene/index/TestPayloads.java +++ b/src/test/org/apache/lucene/index/TestPayloads.java @@ -20,7 +20,9 @@ package org.apache.lucene.index; import java.io.File; import java.io.IOException; import java.io.Reader; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; @@ -319,10 +321,15 @@ public class TestPayloads extends TestCase { } - private byte[] generateRandomData(int n) { - Random rnd = new Random(); - byte[] data = new byte[n]; + private static Random rnd = new Random(); + + private static void generateRandomData(byte[] data) { rnd.nextBytes(data); + } + + private static byte[] generateRandomData(int n) { + byte[] data = new byte[n]; + generateRandomData(data); return data; } @@ -439,5 +446,107 @@ public class TestPayloads extends TestCase { return nextToken; } - } + } + + public void testThreadSafety() throws IOException { + final int numThreads = 5; + final int numDocs = 50; + final ByteArrayPool pool = new ByteArrayPool(numThreads, 5); + + Directory dir = new RAMDirectory(); + final IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer()); + final String field = "test"; + + Thread[] ingesters = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + ingesters[i] = new Thread() { + public void run() { + try { + for (int j = 0; j < numDocs; j++) { + Document d = new Document(); + d.add(new Field(field, new PoolingPayloadTokenStream(pool))); + writer.addDocument(d); + } + } catch (IOException e) { + fail(e.toString()); + } + } + }; + ingesters[i].start(); + } + + for (int i = 0; i < numThreads; i++) { + try { + ingesters[i].join(); + } catch (InterruptedException e) {} + } + + writer.close(); + IndexReader reader = IndexReader.open(dir); + TermEnum terms = reader.terms(); + while (terms.next()) { + TermPositions tp = reader.termPositions(terms.term()); + while(tp.next()) { + int freq = tp.freq(); + for (int i = 0; i < freq; i++) { + tp.nextPosition(); + String s = new String(tp.getPayload(new byte[5], 0)); + assertEquals(s, terms.term().text); + } + } + tp.close(); + } + terms.close(); + reader.close(); + + assertEquals(pool.size(), numThreads); + } + + private static class PoolingPayloadTokenStream extends TokenStream { + private byte[] payload; + private boolean first; + private ByteArrayPool pool; + + PoolingPayloadTokenStream(ByteArrayPool pool) { + this.pool = pool; + payload = pool.get(); + generateRandomData(payload); + first = true; + } + + public Token next() throws IOException { + if (!first) return null; + Token t = new Token(new String(payload), 0, 0); + t.setPayload(new Payload(payload)); + return t; + } + + public void close() throws IOException { + pool.release(payload); + } + + } + + private static class ByteArrayPool { + private List pool; + + ByteArrayPool(int capacity, int size) { + pool = new ArrayList(); + for (int i = 0; i < capacity; i++) { + pool.add(new byte[size]); + } + } + + synchronized byte[] get() { + return (byte[]) pool.remove(0); + } + + synchronized void release(byte[] b) { + pool.add(b); + } + + synchronized int size() { + return pool.size(); + } + } }