LUCENE-1283: factor out ByteSliceWriter from DocumentsWriter

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@657397 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-05-17 16:49:18 +00:00
parent 73ab487b7b
commit 6bafbdf98f
7 changed files with 313 additions and 211 deletions

View File

@ -37,6 +37,11 @@ import java.util.Arrays;
final class ByteBlockPool {
abstract static class Allocator {
abstract void recycleByteBlocks(byte[][] blocks, int start, int end);
abstract byte[] getByteBlock(boolean trackAllocations);
}
public byte[][] buffers = new byte[10][];
int bufferUpto = -1; // Which buffer we are upto
@ -45,11 +50,11 @@ final class ByteBlockPool {
public byte[] buffer; // Current head buffer
public int byteOffset = -DocumentsWriter.BYTE_BLOCK_SIZE; // Current head offset
private boolean trackAllocations;
DocumentsWriter docWriter;
private final boolean trackAllocations;
private final Allocator allocator;
public ByteBlockPool(DocumentsWriter docWriter, boolean trackAllocations) {
this.docWriter = docWriter;
public ByteBlockPool(Allocator allocator, boolean trackAllocations) {
this.allocator = allocator;
this.trackAllocations = trackAllocations;
}
@ -66,7 +71,7 @@ final class ByteBlockPool {
if (bufferUpto > 0)
// Recycle all but the first buffer
docWriter.recycleByteBlocks(buffers, 1, 1+bufferUpto);
allocator.recycleByteBlocks(buffers, 1, 1+bufferUpto);
// Re-use the first buffer
bufferUpto = 0;
@ -82,7 +87,7 @@ final class ByteBlockPool {
System.arraycopy(buffers, 0, newBuffers, 0, buffers.length);
buffers = newBuffers;
}
buffer = buffers[1+bufferUpto] = docWriter.getByteBlock(trackAllocations);
buffer = buffers[1+bufferUpto] = allocator.getByteBlock(trackAllocations);
bufferUpto++;
byteUpto = 0;

View File

@ -0,0 +1,89 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Class to write byte streams into slices of shared
* byte[]. This is used by DocumentsWriter to hold the
* posting list for many terms in RAM.
*/
final class ByteSliceWriter {
private byte[] slice;
private int upto;
private final ByteBlockPool pool;
int offset0;
public ByteSliceWriter(ByteBlockPool pool) {
this.pool = pool;
}
/**
* Set up the writer to write at address.
*/
public void init(int address) {
slice = pool.buffers[address >> DocumentsWriter.BYTE_BLOCK_SHIFT];
assert slice != null;
upto = address & DocumentsWriter.BYTE_BLOCK_MASK;
offset0 = address;
assert upto < slice.length;
}
/** Write byte into byte slice stream */
public void writeByte(byte b) {
assert slice != null;
if (slice[upto] != 0) {
upto = pool.allocSlice(slice, upto);
slice = pool.buffer;
offset0 = pool.byteOffset;
assert slice != null;
}
slice[upto++] = b;
assert upto != slice.length;
}
public void writeBytes(final byte[] b, int offset, final int len) {
final int offsetEnd = offset + len;
while(offset < offsetEnd) {
if (slice[upto] != 0) {
// End marker
upto = pool.allocSlice(slice, upto);
slice = pool.buffer;
offset0 = pool.byteOffset;
}
slice[upto++] = b[offset++];
assert upto != slice.length;
}
}
public int getAddress() {
return upto + (offset0 & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
}
public void writeVInt(int i) {
while ((i & ~0x7F) != 0) {
writeByte((byte)((i & 0x7f) | 0x80));
i >>>= 7;
}
writeByte((byte) i);
}
}

View File

@ -1473,29 +1473,38 @@ final class DocumentsWriter {
final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
private ArrayList freeByteBlocks = new ArrayList();
private class ByteBlockAllocator extends ByteBlockPool.Allocator {
/* Allocate another byte[] from the shared pool */
synchronized byte[] getByteBlock(boolean trackAllocations) {
final int size = freeByteBlocks.size();
final byte[] b;
if (0 == size) {
numBytesAlloc += BYTE_BLOCK_SIZE;
balanceRAM();
b = new byte[BYTE_BLOCK_SIZE];
} else
b = (byte[]) freeByteBlocks.remove(size-1);
if (trackAllocations)
numBytesUsed += BYTE_BLOCK_SIZE;
assert numBytesUsed <= numBytesAlloc;
return b;
ArrayList freeByteBlocks = new ArrayList();
/* Allocate another byte[] from the shared pool */
byte[] getByteBlock(boolean trackAllocations) {
synchronized(DocumentsWriter.this) {
final int size = freeByteBlocks.size();
final byte[] b;
if (0 == size) {
numBytesAlloc += BYTE_BLOCK_SIZE;
balanceRAM();
b = new byte[BYTE_BLOCK_SIZE];
} else
b = (byte[]) freeByteBlocks.remove(size-1);
if (trackAllocations)
numBytesUsed += BYTE_BLOCK_SIZE;
assert numBytesUsed <= numBytesAlloc;
return b;
}
}
/* Return byte[]'s to the pool */
void recycleByteBlocks(byte[][] blocks, int start, int end) {
synchronized(DocumentsWriter.this) {
for(int i=start;i<end;i++)
freeByteBlocks.add(blocks[i]);
}
}
}
/* Return byte[]'s to the pool */
synchronized void recycleByteBlocks(byte[][] blocks, int start, int end) {
for(int i=start;i<end;i++)
freeByteBlocks.add(blocks[i]);
}
ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator();
/* Initial chunk size of the shared char[] blocks used to
store term text */
@ -1563,7 +1572,7 @@ final class DocumentsWriter {
" allocMB=" + toMB(numBytesAlloc) +
" vs trigger=" + toMB(freeTrigger) +
" postingsFree=" + toMB(postingsFreeCount*POSTING_NUM_BYTE) +
" byteBlockFree=" + toMB(freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
" byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
" charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE));
// When we've crossed 100% of our target Postings
@ -1580,7 +1589,7 @@ final class DocumentsWriter {
// (freeLevel)
while(numBytesAlloc > freeLevel) {
if (0 == freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == postingsFreeCount) {
if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == postingsFreeCount) {
// Nothing else to free -- must flush now.
bufferIsFull = true;
if (infoStream != null)
@ -1588,8 +1597,8 @@ final class DocumentsWriter {
break;
}
if ((0 == iter % 3) && freeByteBlocks.size() > 0) {
freeByteBlocks.remove(freeByteBlocks.size()-1);
if ((0 == iter % 3) && byteBlockAllocator.freeByteBlocks.size() > 0) {
byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
numBytesAlloc -= BYTE_BLOCK_SIZE;
}

View File

@ -64,9 +64,14 @@ final class DocumentsWriterFieldData implements Comparable {
float boost;
int postingsVectorsUpto;
final ByteSliceWriter sliceWriter;
final ByteSliceWriter vectorsSliceWriter;
public DocumentsWriterFieldData(DocumentsWriterThreadState threadState, FieldInfo fieldInfo) {
this.fieldInfo = fieldInfo;
this.threadState = threadState;
sliceWriter = new ByteSliceWriter(threadState.postingsPool);
vectorsSliceWriter = new ByteSliceWriter(threadState.vectorsPool);
}
void resetPostingArrays() {
@ -406,15 +411,15 @@ final class DocumentsWriterFieldData implements Comparable {
// Now that we know doc freq for previous doc,
// write it & lastDocCode
freqUpto = p.freqUpto & DocumentsWriter.BYTE_BLOCK_MASK;
freq = threadState.postingsPool.buffers[p.freqUpto >> DocumentsWriter.BYTE_BLOCK_SHIFT];
sliceWriter.init(p.freqUpto);
if (1 == p.docFreq)
writeFreqVInt(p.lastDocCode|1);
sliceWriter.writeVInt(p.lastDocCode|1);
else {
writeFreqVInt(p.lastDocCode);
writeFreqVInt(p.docFreq);
sliceWriter.writeVInt(p.lastDocCode);
sliceWriter.writeVInt(p.docFreq);
}
p.freqUpto = freqUpto + (p.freqUpto & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
p.freqUpto = sliceWriter.getAddress();
if (doVectors) {
vector = addNewVector();
@ -520,155 +525,37 @@ final class DocumentsWriterFieldData implements Comparable {
proxCode = position;
}
proxUpto = p.proxUpto & DocumentsWriter.BYTE_BLOCK_MASK;
prox = threadState.postingsPool.buffers[p.proxUpto >> DocumentsWriter.BYTE_BLOCK_SHIFT];
assert prox != null;
sliceWriter.init(p.proxUpto);
if (payload != null && payload.length > 0) {
writeProxVInt((proxCode<<1)|1);
writeProxVInt(payload.length);
writeProxBytes(payload.data, payload.offset, payload.length);
sliceWriter.writeVInt((proxCode<<1)|1);
sliceWriter.writeVInt(payload.length);
sliceWriter.writeBytes(payload.data, payload.offset, payload.length);
fieldInfo.storePayloads = true;
} else
writeProxVInt(proxCode<<1);
p.proxUpto = proxUpto + (p.proxUpto & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
sliceWriter.writeVInt(proxCode<<1);
p.proxUpto = sliceWriter.getAddress();
p.lastPosition = position++;
if (doVectorPositions) {
posUpto = vector.posUpto & DocumentsWriter.BYTE_BLOCK_MASK;
pos = threadState.vectorsPool.buffers[vector.posUpto >> DocumentsWriter.BYTE_BLOCK_SHIFT];
writePosVInt(proxCode);
vector.posUpto = posUpto + (vector.posUpto & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
vectorsSliceWriter.init(vector.posUpto);
vectorsSliceWriter.writeVInt(proxCode);
vector.posUpto = vectorsSliceWriter.getAddress();
}
if (doVectorOffsets) {
offsetUpto = vector.offsetUpto & DocumentsWriter.BYTE_BLOCK_MASK;
offsets = threadState.vectorsPool.buffers[vector.offsetUpto >> DocumentsWriter.BYTE_BLOCK_SHIFT];
writeOffsetVInt(offsetStartCode);
writeOffsetVInt(offsetEnd-offsetStart);
vectorsSliceWriter.init(vector.offsetUpto);
vectorsSliceWriter.writeVInt(offsetStartCode);
vectorsSliceWriter.writeVInt(offsetEnd-offsetStart);
vector.lastOffset = offsetEnd;
vector.offsetUpto = offsetUpto + (vector.offsetUpto & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
vector.offsetUpto = vectorsSliceWriter.getAddress();
}
} catch (Throwable t) {
throw new AbortException(t, threadState.docWriter);
}
}
/** Write vInt into freq stream of current Posting */
public void writeFreqVInt(int i) {
while ((i & ~0x7F) != 0) {
writeFreqByte((byte)((i & 0x7f) | 0x80));
i >>>= 7;
}
writeFreqByte((byte) i);
}
/** Write vInt into prox stream of current Posting */
public void writeProxVInt(int i) {
while ((i & ~0x7F) != 0) {
writeProxByte((byte)((i & 0x7f) | 0x80));
i >>>= 7;
}
writeProxByte((byte) i);
}
/** Write byte into freq stream of current Posting */
byte[] freq;
int freqUpto;
public void writeFreqByte(byte b) {
assert freq != null;
if (freq[freqUpto] != 0) {
freqUpto = threadState.postingsPool.allocSlice(freq, freqUpto);
freq = threadState.postingsPool.buffer;
p.freqUpto = threadState.postingsPool.byteOffset;
}
freq[freqUpto++] = b;
}
/** Write byte into prox stream of current Posting */
byte[] prox;
int proxUpto;
public void writeProxByte(byte b) {
assert prox != null;
if (prox[proxUpto] != 0) {
proxUpto = threadState.postingsPool.allocSlice(prox, proxUpto);
prox = threadState.postingsPool.buffer;
p.proxUpto = threadState.postingsPool.byteOffset;
assert prox != null;
}
prox[proxUpto++] = b;
assert proxUpto != prox.length;
}
/** Currently only used to copy a payload into the prox
* stream. */
public void writeProxBytes(byte[] b, int offset, int len) {
final int offsetEnd = offset + len;
while(offset < offsetEnd) {
if (prox[proxUpto] != 0) {
// End marker
proxUpto = threadState.postingsPool.allocSlice(prox, proxUpto);
prox = threadState.postingsPool.buffer;
p.proxUpto = threadState.postingsPool.byteOffset;
}
prox[proxUpto++] = b[offset++];
assert proxUpto != prox.length;
}
}
/** Write vInt into offsets stream of current
* PostingVector */
public void writeOffsetVInt(int i) {
while ((i & ~0x7F) != 0) {
writeOffsetByte((byte)((i & 0x7f) | 0x80));
i >>>= 7;
}
writeOffsetByte((byte) i);
}
byte[] offsets;
int offsetUpto;
/** Write byte into offsets stream of current
* PostingVector */
public void writeOffsetByte(byte b) {
assert offsets != null;
if (offsets[offsetUpto] != 0) {
offsetUpto = threadState.vectorsPool.allocSlice(offsets, offsetUpto);
offsets = threadState.vectorsPool.buffer;
vector.offsetUpto = threadState.vectorsPool.byteOffset;
}
offsets[offsetUpto++] = b;
}
/** Write vInt into pos stream of current
* PostingVector */
public void writePosVInt(int i) {
while ((i & ~0x7F) != 0) {
writePosByte((byte)((i & 0x7f) | 0x80));
i >>>= 7;
}
writePosByte((byte) i);
}
byte[] pos;
int posUpto;
/** Write byte into pos stream of current
* PostingVector */
public void writePosByte(byte b) {
assert pos != null;
if (pos[posUpto] != 0) {
posUpto = threadState.vectorsPool.allocSlice(pos, posUpto);
pos = threadState.vectorsPool.buffer;
vector.posUpto = threadState.vectorsPool.byteOffset;
}
pos[posUpto++] = b;
}
/** Called when postings hash is too small (> 50%
* occupied) or too large (< 20% occupied). */
void rehashPostings(final int newSize) {

View File

@ -80,8 +80,8 @@ final class DocumentsWriterThreadState {
postingsFreeList = new Posting[256];
postingsFreeCount = 0;
postingsPool = new ByteBlockPool(docWriter ,true);
vectorsPool = new ByteBlockPool(docWriter, false);
postingsPool = new ByteBlockPool(docWriter.byteBlockAllocator, true);
vectorsPool = new ByteBlockPool(docWriter.byteBlockAllocator, false);
charPool = new CharBlockPool(docWriter);
}

View File

@ -0,0 +1,109 @@
package org.apache.lucene.index;
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Random;
import java.util.ArrayList;
import org.apache.lucene.util.LuceneTestCase;
public class TestByteSlices extends LuceneTestCase {
private static class ByteBlockAllocator extends ByteBlockPool.Allocator {
ArrayList freeByteBlocks = new ArrayList();
/* Allocate another byte[] from the shared pool */
synchronized byte[] getByteBlock(boolean trackAllocations) {
final int size = freeByteBlocks.size();
final byte[] b;
if (0 == size)
b = new byte[DocumentsWriter.BYTE_BLOCK_SIZE];
else
b = (byte[]) freeByteBlocks.remove(size-1);
return b;
}
/* Return a byte[] to the pool */
synchronized void recycleByteBlocks(byte[][] blocks, int start, int end) {
for(int i=start;i<end;i++)
freeByteBlocks.add(blocks[i]);
}
}
public void testBasic() throws Throwable {
ByteBlockPool pool = new ByteBlockPool(new ByteBlockAllocator(), false);
final int NUM_STREAM = 25;
ByteSliceWriter writer = new ByteSliceWriter(pool);
int[] starts = new int[NUM_STREAM];
int[] uptos = new int[NUM_STREAM];
int[] counters = new int[NUM_STREAM];
Random r = new Random(1);
ByteSliceReader reader = new ByteSliceReader();
for(int ti=0;ti<100;ti++) {
for(int stream=0;stream<NUM_STREAM;stream++) {
starts[stream] = -1;
counters[stream] = 0;
}
boolean debug = false;
for(int iter=0;iter<10000;iter++) {
int stream = r.nextInt(NUM_STREAM);
if (debug)
System.out.println("write stream=" + stream);
if (starts[stream] == -1) {
final int spot = pool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
starts[stream] = uptos[stream] = spot + pool.byteOffset;
if (debug)
System.out.println(" init to " + starts[stream]);
}
writer.init(uptos[stream]);
int numValue = r.nextInt(20);
for(int j=0;j<numValue;j++) {
if (debug)
System.out.println(" write " + (counters[stream]+j));
writer.writeVInt(counters[stream]+j);
//writer.writeVInt(ti);
}
counters[stream] += numValue;
uptos[stream] = writer.getAddress();
if (debug)
System.out.println(" addr now " + uptos[stream]);
}
for(int stream=0;stream<NUM_STREAM;stream++) {
if (debug)
System.out.println(" stream=" + stream + " count=" + counters[stream]);
if (starts[stream] != uptos[stream]) {
reader.init(pool, starts[stream], uptos[stream]);
for(int j=0;j<counters[stream];j++)
assertEquals(j, reader.readVInt());
//assertEquals(ti, reader.readVInt());
}
}
pool.reset();
}
}
}

View File

@ -112,45 +112,48 @@ public class TestStressIndexing2 extends LuceneTestCase {
// everything.
public Map indexRandom(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException {
IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED);
w.setUseCompoundFile(false);
/***
w.setMaxMergeDocs(Integer.MAX_VALUE);
w.setMaxFieldLength(10000);
w.setRAMBufferSizeMB(1);
w.setMergeFactor(10);
***/
// force many merges
w.setMergeFactor(mergeFactor);
w.setRAMBufferSizeMB(.1);
w.setMaxBufferedDocs(maxBufferedDocs);
threads = new IndexingThread[nThreads];
for (int i=0; i<threads.length; i++) {
IndexingThread th = new IndexingThread();
th.w = w;
th.base = 1000000*i;
th.range = range;
th.iterations = iterations;
threads[i] = th;
}
for (int i=0; i<threads.length; i++) {
threads[i].start();
}
for (int i=0; i<threads.length; i++) {
threads[i].join();
}
// w.optimize();
w.close();
Map docs = new HashMap();
for (int i=0; i<threads.length; i++) {
IndexingThread th = threads[i];
synchronized(th) {
docs.putAll(th.docs);
for(int iter=0;iter<3;iter++) {
IndexWriter w = new MockIndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED);
w.setUseCompoundFile(false);
/***
w.setMaxMergeDocs(Integer.MAX_VALUE);
w.setMaxFieldLength(10000);
w.setRAMBufferSizeMB(1);
w.setMergeFactor(10);
***/
// force many merges
w.setMergeFactor(mergeFactor);
w.setRAMBufferSizeMB(.1);
w.setMaxBufferedDocs(maxBufferedDocs);
threads = new IndexingThread[nThreads];
for (int i=0; i<threads.length; i++) {
IndexingThread th = new IndexingThread();
th.w = w;
th.base = 1000000*i;
th.range = range;
th.iterations = iterations;
threads[i] = th;
}
for (int i=0; i<threads.length; i++) {
threads[i].start();
}
for (int i=0; i<threads.length; i++) {
threads[i].join();
}
// w.optimize();
w.close();
for (int i=0; i<threads.length; i++) {
IndexingThread th = threads[i];
synchronized(th) {
docs.putAll(th.docs);
}
}
}
@ -515,16 +518,16 @@ public class TestStressIndexing2 extends LuceneTestCase {
switch (nextInt(4)) {
case 0:
fields.add(new Field("f0", getString(1), Field.Store.YES, Field.Index.NO_NORMS, tvVal));
fields.add(new Field("f" + nextInt(100), getString(1), Field.Store.YES, Field.Index.NO_NORMS, tvVal));
break;
case 1:
fields.add(new Field("f1", getString(0), Field.Store.NO, Field.Index.TOKENIZED, tvVal));
fields.add(new Field("f" + nextInt(100), getString(0), Field.Store.NO, Field.Index.TOKENIZED, tvVal));
break;
case 2:
fields.add(new Field("f2", getString(0), Field.Store.YES, Field.Index.NO, Field.TermVector.NO));
fields.add(new Field("f" + nextInt(100), getString(0), Field.Store.YES, Field.Index.NO, Field.TermVector.NO));
break;
case 3:
fields.add(new Field("f3", getString(bigFieldSize), Field.Store.YES, Field.Index.TOKENIZED, tvVal));
fields.add(new Field("f" + nextInt(100), getString(bigFieldSize), Field.Store.YES, Field.Index.TOKENIZED, tvVal));
break;
}
}