LUCENE-880: Fixed DocumentWriter to close the TokenStreams after it has written the postings.

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@538892 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Busch 2007-05-17 12:38:43 +00:00
parent b5e09521b5
commit 56547a1f36
3 changed files with 210 additions and 71 deletions

View File

@ -120,6 +120,10 @@ Bug fixes
was set has no effect - it is masked by the similarity of the MultiSearcher. This is as was set has no effect - it is masked by the similarity of the MultiSearcher. This is as
designed, because MultiSearcher operates on Searchables (not Searchers). (Doron Cohen) designed, because MultiSearcher operates on Searchables (not Searchers). (Doron Cohen)
15. LUCENE-880: Fixed DocumentWriter to close the TokenStreams after it
has written the postings. Then the resources associated with the
TokenStreams can safely be released. (Michael Busch)
New features New features
1. LUCENE-759: Added two n-gram-producing TokenFilters. 1. LUCENE-759: Added two n-gram-producing TokenFilters.

View File

@ -35,6 +35,8 @@ import java.util.BitSet;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
final class DocumentWriter { final class DocumentWriter {
private Analyzer analyzer; private Analyzer analyzer;
@ -84,46 +86,67 @@ final class DocumentWriter {
fieldBoosts = new float[fieldInfos.size()]; // init fieldBoosts fieldBoosts = new float[fieldInfos.size()]; // init fieldBoosts
Arrays.fill(fieldBoosts, doc.getBoost()); Arrays.fill(fieldBoosts, doc.getBoost());
// Before we write the FieldInfos we invert the Document. The reason is that
// during invertion the TokenStreams of tokenized fields are being processed
// and we might encounter tokens that have payloads associated with them. In
// this case we have to update the FieldInfo of the particular field.
invertDocument(doc);
// sort postingTable into an array
Posting[] postings = sortPostingTable();
// write field infos
fieldInfos.write(directory, segment + ".fnm");
// write field values
FieldsWriter fieldsWriter =
new FieldsWriter(directory, segment, fieldInfos);
try { try {
fieldsWriter.addDocument(doc);
// Before we write the FieldInfos we invert the Document. The reason is that
// during invertion the TokenStreams of tokenized fields are being processed
// and we might encounter tokens that have payloads associated with them. In
// this case we have to update the FieldInfo of the particular field.
invertDocument(doc);
// sort postingTable into an array
Posting[] postings = sortPostingTable();
// write field infos
fieldInfos.write(directory, segment + ".fnm");
// write field values
FieldsWriter fieldsWriter =
new FieldsWriter(directory, segment, fieldInfos);
try {
fieldsWriter.addDocument(doc);
} finally {
fieldsWriter.close();
}
/*
for (int i = 0; i < postings.length; i++) {
Posting posting = postings[i];
System.out.print(posting.term);
System.out.print(" freq=" + posting.freq);
System.out.print(" pos=");
System.out.print(posting.positions[0]);
for (int j = 1; j < posting.freq; j++)
System.out.print("," + posting.positions[j]);
System.out.println("");
}
*/
// write postings
writePostings(postings, segment);
// write norms of indexed fields
writeNorms(segment);
} finally { } finally {
fieldsWriter.close(); // close TokenStreams
IOException ex = null;
Iterator it = openTokenStreams.iterator();
while (it.hasNext()) {
try {
((TokenStream) it.next()).close();
} catch (IOException e) {
if (ex != null) {
ex = e;
}
}
}
openTokenStreams.clear();
if (ex != null) {
throw ex;
}
} }
/*
for (int i = 0; i < postings.length; i++) {
Posting posting = postings[i];
System.out.print(posting.term);
System.out.print(" freq=" + posting.freq);
System.out.print(" pos=");
System.out.print(posting.positions[0]);
for (int j = 1; j < posting.freq; j++)
System.out.print("," + posting.positions[j]);
System.out.println("");
}
*/
// write postings
writePostings(postings, segment);
// write norms of indexed fields
writeNorms(segment);
} }
// Keys are Terms, values are Postings. // Keys are Terms, values are Postings.
@ -138,6 +161,10 @@ final class DocumentWriter {
// then we enable payloads for that field. // then we enable payloads for that field.
private BitSet fieldStoresPayloads; private BitSet fieldStoresPayloads;
// Keep references of the token streams. We must close them after
// the postings are written to the segment.
private List openTokenStreams = new LinkedList();
// Tokenizes the fields of a document into Postings. // Tokenizes the fields of a document into Postings.
private final void invertDocument(Document doc) private final void invertDocument(Document doc)
throws IOException { throws IOException {
@ -181,42 +208,41 @@ final class DocumentWriter {
stream = analyzer.tokenStream(fieldName, reader); stream = analyzer.tokenStream(fieldName, reader);
} }
// remember this TokenStream, we must close it later
openTokenStreams.add(stream);
// reset the TokenStream to the first token // reset the TokenStream to the first token
stream.reset(); stream.reset();
try {
Token lastToken = null;
for (Token t = stream.next(); t != null; t = stream.next()) {
position += (t.getPositionIncrement() - 1);
Payload payload = t.getPayload(); Token lastToken = null;
if (payload != null) { for (Token t = stream.next(); t != null; t = stream.next()) {
// enable payloads for this field position += (t.getPositionIncrement() - 1);
fieldStoresPayloads.set(fieldNumber);
}
TermVectorOffsetInfo termVectorOffsetInfo; Payload payload = t.getPayload();
if (field.isStoreOffsetWithTermVector()) { if (payload != null) {
termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset()); // enable payloads for this field
} else { fieldStoresPayloads.set(fieldNumber);
termVectorOffsetInfo = null;
}
addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo);
lastToken = t;
if (++length >= maxFieldLength) {
if (infoStream != null)
infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens");
break;
}
} }
if(lastToken != null) TermVectorOffsetInfo termVectorOffsetInfo;
offset += lastToken.endOffset() + 1; if (field.isStoreOffsetWithTermVector()) {
termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset());
} else {
termVectorOffsetInfo = null;
}
addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo);
} finally { lastToken = t;
stream.close(); if (++length >= maxFieldLength) {
if (infoStream != null)
infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens");
break;
}
} }
if(lastToken != null)
offset += lastToken.endOffset() + 1;
} }
fieldLengths[fieldNumber] = length; // save field length fieldLengths[fieldNumber] = length; // save field length

View File

@ -20,7 +20,9 @@ package org.apache.lucene.index;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -319,10 +321,15 @@ public class TestPayloads extends TestCase {
} }
private byte[] generateRandomData(int n) { private static Random rnd = new Random();
Random rnd = new Random();
byte[] data = new byte[n]; private static void generateRandomData(byte[] data) {
rnd.nextBytes(data); rnd.nextBytes(data);
}
private static byte[] generateRandomData(int n) {
byte[] data = new byte[n];
generateRandomData(data);
return data; return data;
} }
@ -439,5 +446,107 @@ public class TestPayloads extends TestCase {
return nextToken; return nextToken;
} }
} }
public void testThreadSafety() throws IOException {
final int numThreads = 5;
final int numDocs = 50;
final ByteArrayPool pool = new ByteArrayPool(numThreads, 5);
Directory dir = new RAMDirectory();
final IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
final String field = "test";
Thread[] ingesters = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
ingesters[i] = new Thread() {
public void run() {
try {
for (int j = 0; j < numDocs; j++) {
Document d = new Document();
d.add(new Field(field, new PoolingPayloadTokenStream(pool)));
writer.addDocument(d);
}
} catch (IOException e) {
fail(e.toString());
}
}
};
ingesters[i].start();
}
for (int i = 0; i < numThreads; i++) {
try {
ingesters[i].join();
} catch (InterruptedException e) {}
}
writer.close();
IndexReader reader = IndexReader.open(dir);
TermEnum terms = reader.terms();
while (terms.next()) {
TermPositions tp = reader.termPositions(terms.term());
while(tp.next()) {
int freq = tp.freq();
for (int i = 0; i < freq; i++) {
tp.nextPosition();
String s = new String(tp.getPayload(new byte[5], 0));
assertEquals(s, terms.term().text);
}
}
tp.close();
}
terms.close();
reader.close();
assertEquals(pool.size(), numThreads);
}
private static class PoolingPayloadTokenStream extends TokenStream {
private byte[] payload;
private boolean first;
private ByteArrayPool pool;
PoolingPayloadTokenStream(ByteArrayPool pool) {
this.pool = pool;
payload = pool.get();
generateRandomData(payload);
first = true;
}
public Token next() throws IOException {
if (!first) return null;
Token t = new Token(new String(payload), 0, 0);
t.setPayload(new Payload(payload));
return t;
}
public void close() throws IOException {
pool.release(payload);
}
}
private static class ByteArrayPool {
private List pool;
ByteArrayPool(int capacity, int size) {
pool = new ArrayList();
for (int i = 0; i < capacity; i++) {
pool.add(new byte[size]);
}
}
synchronized byte[] get() {
return (byte[]) pool.remove(0);
}
synchronized void release(byte[] b) {
pool.add(b);
}
synchronized int size() {
return pool.size();
}
}
} }