mirror of https://github.com/apache/lucene.git
LUCENE-5189: factor out SegmentDocValues from SegmentReader
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1535526 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b6eab2b39a
commit
d37b2cb90b
|
@ -0,0 +1,108 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.codecs.DocValuesFormat;
|
||||
import org.apache.lucene.codecs.DocValuesProducer;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.RefCount;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Manages the {@link DocValuesProducer} held by {@link SegmentReader} and
|
||||
* keeps track of their reference counting.
|
||||
*/
|
||||
final class SegmentDocValues {
|
||||
|
||||
private final Map<Long,RefCount<DocValuesProducer>> genDVProducers = new HashMap<Long,RefCount<DocValuesProducer>>();
|
||||
|
||||
private RefCount<DocValuesProducer> newDocValuesProducer(SegmentInfoPerCommit si, IOContext context, Directory dir,
|
||||
DocValuesFormat dvFormat, final Long gen, List<FieldInfo> infos) throws IOException {
|
||||
Directory dvDir = dir;
|
||||
String segmentSuffix = "";
|
||||
if (gen.longValue() != -1) {
|
||||
dvDir = si.info.dir; // gen'd files are written outside CFS, so use SegInfo directory
|
||||
segmentSuffix = Long.toString(gen.longValue(), Character.MAX_RADIX);
|
||||
}
|
||||
|
||||
// set SegmentReadState to list only the fields that are relevant to that gen
|
||||
SegmentReadState srs = new SegmentReadState(dvDir, si.info, new FieldInfos(infos.toArray(new FieldInfo[infos.size()])), context, segmentSuffix);
|
||||
return new RefCount<DocValuesProducer>(dvFormat.fieldsProducer(srs)) {
|
||||
@SuppressWarnings("synthetic-access")
|
||||
@Override
|
||||
protected void release() throws IOException {
|
||||
object.close();
|
||||
synchronized (SegmentDocValues.this) {
|
||||
genDVProducers.remove(gen);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/** Returns the {@link DocValuesProducer} for the given generation. */
|
||||
synchronized DocValuesProducer getDocValuesProducer(long gen, SegmentInfoPerCommit si, IOContext context, Directory dir,
|
||||
DocValuesFormat dvFormat, List<FieldInfo> infos) throws IOException {
|
||||
RefCount<DocValuesProducer> dvp = genDVProducers.get(gen);
|
||||
if (dvp == null) {
|
||||
dvp = newDocValuesProducer(si, context, dir, dvFormat, gen, infos);
|
||||
assert dvp != null;
|
||||
genDVProducers.put(gen, dvp);
|
||||
} else {
|
||||
dvp.incRef();
|
||||
}
|
||||
return dvp.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement the reference count of the given {@link DocValuesProducer}
|
||||
* generations.
|
||||
*/
|
||||
synchronized void decRef(List<Long> dvProducersGens) throws IOException {
|
||||
Throwable t = null;
|
||||
for (Long gen : dvProducersGens) {
|
||||
RefCount<DocValuesProducer> dvp = genDVProducers.get(gen);
|
||||
assert dvp != null : "gen=" + gen;
|
||||
try {
|
||||
dvp.decRef();
|
||||
} catch (Throwable th) {
|
||||
if (t != null) {
|
||||
t = th;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (t != null) {
|
||||
IOUtils.reThrow(t);
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns approximate RAM bytes used. */
|
||||
synchronized long ramBytesUsed() {
|
||||
long ramBytesUsed = 0;
|
||||
for (RefCount<DocValuesProducer> dvp : genDVProducers.values()) {
|
||||
ramBytesUsed += dvp.get().ramBytesUsed();
|
||||
}
|
||||
return ramBytesUsed;
|
||||
}
|
||||
|
||||
}
|
|
@ -21,8 +21,8 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.DocValuesFormat;
|
||||
|
@ -36,8 +36,6 @@ import org.apache.lucene.store.Directory;
|
|||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.CloseableThreadLocal;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.RefCount;
|
||||
|
||||
/**
|
||||
* IndexReader implementation over a single segment.
|
||||
|
@ -48,18 +46,6 @@ import org.apache.lucene.util.RefCount;
|
|||
*/
|
||||
public final class SegmentReader extends AtomicReader {
|
||||
|
||||
private static final class DocValuesRefCount extends RefCount<DocValuesProducer> {
|
||||
|
||||
public DocValuesRefCount(DocValuesProducer object) {
|
||||
super(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void release() throws IOException {
|
||||
object.close();
|
||||
}
|
||||
}
|
||||
|
||||
private final SegmentInfoPerCommit si;
|
||||
private final Bits liveDocs;
|
||||
|
||||
|
@ -69,7 +55,8 @@ public final class SegmentReader extends AtomicReader {
|
|||
private final int numDocs;
|
||||
|
||||
final SegmentCoreReaders core;
|
||||
|
||||
final SegmentDocValues segDocValues;
|
||||
|
||||
final CloseableThreadLocal<Map<String,Object>> docValuesLocal = new CloseableThreadLocal<Map<String,Object>>() {
|
||||
@Override
|
||||
protected Map<String,Object> initialValue() {
|
||||
|
@ -85,9 +72,10 @@ public final class SegmentReader extends AtomicReader {
|
|||
};
|
||||
|
||||
final Map<String,DocValuesProducer> dvProducers = new HashMap<String,DocValuesProducer>();
|
||||
final Map<Long,RefCount<DocValuesProducer>> genDVProducers = new HashMap<Long,RefCount<DocValuesProducer>>();
|
||||
|
||||
|
||||
final FieldInfos fieldInfos;
|
||||
|
||||
private final List<Long> dvGens = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Constructs a new SegmentReader with a new core.
|
||||
|
@ -105,7 +93,8 @@ public final class SegmentReader extends AtomicReader {
|
|||
// constructors don't allow returning two things...
|
||||
fieldInfos = readFieldInfos(si);
|
||||
core = new SegmentCoreReaders(this, si.info.dir, si, context);
|
||||
|
||||
segDocValues = new SegmentDocValues();
|
||||
|
||||
boolean success = false;
|
||||
final Codec codec = si.info.getCodec();
|
||||
try {
|
||||
|
@ -119,27 +108,7 @@ public final class SegmentReader extends AtomicReader {
|
|||
numDocs = si.info.getDocCount() - si.getDelCount();
|
||||
|
||||
if (fieldInfos.hasDocValues()) {
|
||||
final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir;
|
||||
final DocValuesFormat dvFormat = codec.docValuesFormat();
|
||||
// initialize the per generation numericDVProducers and put the correct
|
||||
// DVProducer for each field
|
||||
final Map<Long,List<FieldInfo>> genInfos = getGenInfos();
|
||||
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] SR.init: new reader: " + si + "; gens=" + genInfos.keySet());
|
||||
|
||||
for (Entry<Long,List<FieldInfo>> e : genInfos.entrySet()) {
|
||||
Long gen = e.getKey();
|
||||
List<FieldInfo> infos = e.getValue();
|
||||
RefCount<DocValuesProducer> dvp = genDVProducers.get(gen);
|
||||
if (dvp == null) {
|
||||
dvp = newDocValuesProducer(si, context, dir, dvFormat, gen, infos);
|
||||
assert dvp != null;
|
||||
genDVProducers.put(gen, dvp);
|
||||
}
|
||||
for (FieldInfo fi : infos) {
|
||||
dvProducers.put(fi.name, dvp.get());
|
||||
}
|
||||
}
|
||||
initDocValuesProducers(codec);
|
||||
}
|
||||
|
||||
success = true;
|
||||
|
@ -174,6 +143,7 @@ public final class SegmentReader extends AtomicReader {
|
|||
this.numDocs = numDocs;
|
||||
this.core = sr.core;
|
||||
core.incRef();
|
||||
this.segDocValues = sr.segDocValues;
|
||||
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] SR.init: sharing reader: " + sr + " for gens=" + sr.genDVProducers.keySet());
|
||||
|
||||
|
@ -188,34 +158,7 @@ public final class SegmentReader extends AtomicReader {
|
|||
}
|
||||
|
||||
if (fieldInfos.hasDocValues()) {
|
||||
final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir;
|
||||
|
||||
final DocValuesFormat dvFormat = codec.docValuesFormat();
|
||||
final Map<Long,List<FieldInfo>> genInfos = getGenInfos();
|
||||
|
||||
for (Entry<Long,List<FieldInfo>> e : genInfos.entrySet()) {
|
||||
Long gen = e.getKey();
|
||||
List<FieldInfo> infos = e.getValue();
|
||||
RefCount<DocValuesProducer> dvp = genDVProducers.get(gen);
|
||||
if (dvp == null) {
|
||||
// check if this DVP gen is used by the given reader
|
||||
dvp = sr.genDVProducers.get(gen);
|
||||
if (dvp != null) {
|
||||
// gen used by given reader, incRef its DVP
|
||||
dvp.incRef();
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] SR.init: sharing DVP for gen=" + gen + " refCount=" + dvp.getRefCount());
|
||||
} else {
|
||||
// this gen is not used by given reader, initialize a new one
|
||||
dvp = newDocValuesProducer(si, IOContext.READ, dir, dvFormat, gen, infos);
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] SR.init: new DVP for gen=" + gen + " refCount=" + dvp.getRefCount());
|
||||
}
|
||||
assert dvp != null;
|
||||
genDVProducers.put(gen, dvp);
|
||||
}
|
||||
for (FieldInfo fi : infos) {
|
||||
dvProducers.put(fi.name, dvp.get());
|
||||
}
|
||||
}
|
||||
initDocValuesProducers(codec);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
|
@ -225,6 +168,26 @@ public final class SegmentReader extends AtomicReader {
|
|||
}
|
||||
}
|
||||
|
||||
// initialize the per-field DocValuesProducer
|
||||
private void initDocValuesProducers(Codec codec) throws IOException {
|
||||
final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir;
|
||||
final DocValuesFormat dvFormat = codec.docValuesFormat();
|
||||
final Map<Long,List<FieldInfo>> genInfos = getGenInfos();
|
||||
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] SR.initDocValuesProducers: segInfo=" + si + "; gens=" + genInfos.keySet());
|
||||
|
||||
for (Entry<Long,List<FieldInfo>> e : genInfos.entrySet()) {
|
||||
Long gen = e.getKey();
|
||||
List<FieldInfo> infos = e.getValue();
|
||||
DocValuesProducer dvp = segDocValues.getDocValuesProducer(gen, si, IOContext.READ, dir, dvFormat, infos);
|
||||
for (FieldInfo fi : infos) {
|
||||
dvProducers.put(fi.name, dvp);
|
||||
}
|
||||
}
|
||||
|
||||
dvGens.addAll(genInfos.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the most recent {@link FieldInfos} of the given segment info.
|
||||
*
|
||||
|
@ -274,20 +237,6 @@ public final class SegmentReader extends AtomicReader {
|
|||
return genInfos;
|
||||
}
|
||||
|
||||
private RefCount<DocValuesProducer> newDocValuesProducer(SegmentInfoPerCommit si, IOContext context, Directory dir,
|
||||
DocValuesFormat dvFormat, Long gen, List<FieldInfo> infos) throws IOException {
|
||||
Directory dvDir = dir;
|
||||
String segmentSuffix = "";
|
||||
if (gen.longValue() != -1) {
|
||||
dvDir = si.info.dir; // gen'd files are written outside CFS, so use SegInfo directory
|
||||
segmentSuffix = Long.toString(gen.longValue(), Character.MAX_RADIX);
|
||||
}
|
||||
|
||||
// set SegmentReadState to list only the fields that are relevant to that gen
|
||||
SegmentReadState srs = new SegmentReadState(dvDir, si.info, new FieldInfos(infos.toArray(new FieldInfo[infos.size()])), context, segmentSuffix);
|
||||
return new DocValuesRefCount(dvFormat.fieldsProducer(srs));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bits getLiveDocs() {
|
||||
ensureOpen();
|
||||
|
@ -300,21 +249,10 @@ public final class SegmentReader extends AtomicReader {
|
|||
try {
|
||||
core.decRef();
|
||||
} finally {
|
||||
Throwable t = null;
|
||||
for (RefCount<DocValuesProducer> dvp : genDVProducers.values()) {
|
||||
try {
|
||||
dvp.decRef();
|
||||
} catch (Throwable th) {
|
||||
if (t != null) {
|
||||
t = th;
|
||||
}
|
||||
}
|
||||
}
|
||||
genDVProducers.clear();
|
||||
dvProducers.clear();
|
||||
docValuesLocal.close();
|
||||
docsWithFieldLocal.close();
|
||||
IOUtils.reThrow(t);
|
||||
segDocValues.decRef(dvGens);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -603,18 +541,13 @@ public final class SegmentReader extends AtomicReader {
|
|||
core.removeCoreClosedListener(listener);
|
||||
}
|
||||
|
||||
private long dvRamBytesUsed() {
|
||||
long ramBytesUsed = 0;
|
||||
for (RefCount<DocValuesProducer> dvp : genDVProducers.values()) {
|
||||
ramBytesUsed += dvp.get().ramBytesUsed();
|
||||
}
|
||||
return ramBytesUsed;
|
||||
}
|
||||
|
||||
/** Returns approximate RAM Bytes used */
|
||||
public long ramBytesUsed() {
|
||||
ensureOpen();
|
||||
long ramBytesUsed = dvRamBytesUsed();
|
||||
long ramBytesUsed = 0;
|
||||
if (segDocValues != null) {
|
||||
ramBytesUsed += segDocValues.ramBytesUsed();
|
||||
}
|
||||
if (core != null) {
|
||||
ramBytesUsed += core.ramBytesUsed();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue