Grouping segments during flushing

This commit is contained in:
RS146BIJAY 2024-04-11 13:01:29 +05:30
parent 13cf882677
commit bb55c6b374
11 changed files with 372 additions and 38 deletions

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Decorator Merge policy over the existing Tiered Merge policy. During a segment merge, this policy
* would categorize segments according to their grouping function outcomes before merging segments
* within the same category, thus maintaining the grouping criterias integrity throughout the merge
* process.
*
* @lucene.experimental
*/
public class CriteriaBasedGroupingTieredMergePolicy extends TieredMergePolicy {
@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
MergeSpecification spec = null;
final Map<String, List<SegmentCommitInfo>> commitInfos = new HashMap<>();
for (SegmentCommitInfo si : infos) {
if (merging.contains(si)) {
continue;
}
final String dwptGroupNumber = si.info.getAttribute("dwptGroupNumber");
commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si);
}
for (String dwptGroupNumber : commitInfos.keySet()) {
if (commitInfos.get(dwptGroupNumber).size() > 1) {
final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor());
for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) {
newSIS.add(info);
}
final MergeSpecification tieredMergePolicySpec =
super.findMerges(mergeTrigger, infos, mergeContext);
if (tieredMergePolicySpec != null) {
if (spec == null) {
spec = new MergeSpecification();
}
spec.merges.addAll(tieredMergePolicySpec.merges);
}
}
}
return spec;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.util.function.Function;
/** Structure of criteria on the basis of which group af a segment is selected. */
public class DWPTGroupingCriteriaDefinition {
/** Grouping function which determines the DWPT group using which documents will be indexed. */
private final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction;
/**
* Maximum limit on DWPT group size. Any document evaluated in a group number greater this limit
* is indexed using default DWPT group
*/
private final int maxDWPTGroupSize;
/**
* Group number of default DWPT. This group is returned for documents whose grouping function
* outcome is greater than max group limit.
*/
public static final int DEFAULT_DWPT_GROUP_NUMBER = -1;
/** Grouping criteria function to select the DWPT pool group of a document. */
public static final DWPTGroupingCriteriaDefinition DEFAULT_DWPT_GROUPING_CRITERIA_DEFINITION =
new DWPTGroupingCriteriaDefinition(
(document) -> {
return DEFAULT_DWPT_GROUP_NUMBER;
},
1);
/**
* Constructor to create a DWPTGroupingCriteriaDefinition on the basis of a criteria function and
* a max DWPT group size.
*
* @param dwptGroupingCriteriaFunction the grouping criteria function.
* @param maxDWPTGroupSize maximum number of groups allowed by grouping criteria function.
*/
public DWPTGroupingCriteriaDefinition(
final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction,
final int maxDWPTGroupSize) {
this.dwptGroupingCriteriaFunction = dwptGroupingCriteriaFunction;
this.maxDWPTGroupSize = maxDWPTGroupSize;
}
/**
* Returns the grouping criteria function.
*
* @return the grouping criteria function.
*/
public Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
getDwptGroupingCriteriaFunction() {
return dwptGroupingCriteriaFunction;
}
/**
* Returns the max number of groups allowed for this grouping criteria function.
*
* @return the max number of groups allowed for this grouping criteria function.
*/
public int getMaxDWPTGroupSize() {
return maxDWPTGroupSize;
}
}

View File

@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.function.ToLongFunction; import java.util.function.ToLongFunction;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
@ -50,7 +51,7 @@ import org.apache.lucene.util.InfoStream;
* <p>Threads: * <p>Threads:
* *
* <p>Multiple threads are allowed into addDocument at once. There is an initial synchronized call * <p>Multiple threads are allowed into addDocument at once. There is an initial synchronized call
* to {@link DocumentsWriterFlushControl#obtainAndLock()} which allocates a DWPT for this indexing * to {@link DocumentsWriterFlushControl#obtainAndLock} which allocates a DWPT for this indexing
* thread. The same thread will not necessarily get the same DWPT over time. Then updateDocuments is * thread. The same thread will not necessarily get the same DWPT over time. Then updateDocuments is
* called on that DWPT without synchronization (most of the "heavy lifting" is in this call). Once a * called on that DWPT without synchronization (most of the "heavy lifting" is in this call). Once a
* DWPT fills up enough RAM or hold enough documents in memory the DWPT is checked out for flush and * DWPT fills up enough RAM or hold enough documents in memory the DWPT is checked out for flush and
@ -119,7 +120,7 @@ final class DocumentsWriter implements Closeable, Accountable {
this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream); this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
this.perThreadPool = this.perThreadPool =
new DocumentsWriterPerThreadPool( new DocumentsWriterPerThreadPool(
() -> { (dwptGroupNumber) -> {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap); final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
return new DocumentsWriterPerThread( return new DocumentsWriterPerThread(
indexCreatedVersionMajor, indexCreatedVersionMajor,
@ -130,7 +131,8 @@ final class DocumentsWriter implements Closeable, Accountable {
deleteQueue, deleteQueue,
infos, infos,
pendingNumDocs, pendingNumDocs,
enableTestPoints); enableTestPoints,
dwptGroupNumber);
}); });
this.pendingNumDocs = pendingNumDocs; this.pendingNumDocs = pendingNumDocs;
flushControl = new DocumentsWriterFlushControl(this, config); flushControl = new DocumentsWriterFlushControl(this, config);
@ -412,7 +414,7 @@ final class DocumentsWriter implements Closeable, Accountable {
throws IOException { throws IOException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock(); final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock(getDWPTGroupNumber(docs));
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
long seqNo; long seqNo;
@ -453,6 +455,29 @@ final class DocumentsWriter implements Closeable, Accountable {
return false; return false;
} }
/**
* Fetches dwpt group number for a given list of docs. For any group number greater than
* dwptGroupLimit we return the default group.
*
* @param docs the passed list of docs.
* @return dwpt group number for a given list of docs
*/
private int getDWPTGroupNumber(
final Iterable<? extends Iterable<? extends IndexableField>> docs) {
final DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition =
config.getDwptGroupingCriteriaDefinition();
final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
dwptGroupingCriteriaFunction =
dwptGroupingCriteriaDefinition.getDwptGroupingCriteriaFunction();
final int dwptGroupLimit = dwptGroupingCriteriaDefinition.getMaxDWPTGroupSize();
int dwptGroupNumber = dwptGroupingCriteriaFunction.apply(docs);
if (dwptGroupNumber >= dwptGroupLimit) {
dwptGroupNumber = DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER;
}
return dwptGroupNumber;
}
private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
assert flushingDWPT != null : "Flushing DWPT must not be null"; assert flushingDWPT != null : "Flushing DWPT must not be null";
do { do {

View File

@ -521,9 +521,9 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
flushDeletes.set(true); flushDeletes.set(true);
} }
DocumentsWriterPerThread obtainAndLock() { DocumentsWriterPerThread obtainAndLock(int dwptGroupNumber) {
while (closed == false) { while (closed == false) {
final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(); final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(dwptGroupNumber);
if (perThread.deleteQueue == documentsWriter.deleteQueue) { if (perThread.deleteQueue == documentsWriter.deleteQueue) {
// simply return the DWPT even in a flush all case since we already hold the lock and the // simply return the DWPT even in a flush all case since we already hold the lock and the
// DWPT is not stale // DWPT is not stale

View File

@ -52,6 +52,7 @@ import org.apache.lucene.util.Version;
final class DocumentsWriterPerThread implements Accountable, Lock { final class DocumentsWriterPerThread implements Accountable, Lock {
private Throwable abortingException; private Throwable abortingException;
public final int dwptGroupNumber;
private void onAbortingException(Throwable throwable) { private void onAbortingException(Throwable throwable) {
assert throwable != null : "aborting exception must not be null"; assert throwable != null : "aborting exception must not be null";
@ -151,7 +152,8 @@ final class DocumentsWriterPerThread implements Accountable, Lock {
DocumentsWriterDeleteQueue deleteQueue, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos, FieldInfos.Builder fieldInfos,
AtomicLong pendingNumDocs, AtomicLong pendingNumDocs,
boolean enableTestPoints) { boolean enableTestPoints,
int dwptGroupNumber) {
this.indexMajorVersionCreated = indexMajorVersionCreated; this.indexMajorVersionCreated = indexMajorVersionCreated;
this.directory = new TrackingDirectoryWrapper(directory); this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos; this.fieldInfos = fieldInfos;
@ -163,6 +165,7 @@ final class DocumentsWriterPerThread implements Accountable, Lock {
this.deleteQueue = Objects.requireNonNull(deleteQueue); this.deleteQueue = Objects.requireNonNull(deleteQueue);
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
deleteSlice = deleteQueue.newSlice(); deleteSlice = deleteQueue.newSlice();
this.dwptGroupNumber = dwptGroupNumber;
segmentInfo = segmentInfo =
new SegmentInfo( new SegmentInfo(
@ -178,6 +181,7 @@ final class DocumentsWriterPerThread implements Accountable, Lock {
StringHelper.randomId(), StringHelper.randomId(),
Collections.emptyMap(), Collections.emptyMap(),
indexWriterConfig.getIndexSort()); indexWriterConfig.getIndexSort());
segmentInfo.putAttribute("dwptGroupNumber", String.valueOf(dwptGroupNumber));
assert numDocsInRAM == 0; assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message( infoStream.message(

View File

@ -22,9 +22,11 @@ import java.util.Collections;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
@ -42,21 +44,25 @@ import org.apache.lucene.util.ThreadInterruptedException;
*/ */
final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerThread>, Closeable { final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerThread>, Closeable {
private final Set<DocumentsWriterPerThread> dwpts = private final Map<Integer, Set<DocumentsWriterPerThread>> dwpts = new ConcurrentHashMap<>();
Collections.newSetFromMap(new IdentityHashMap<>()); private final Map<Integer, LockableConcurrentApproximatePriorityQueue<DocumentsWriterPerThread>>
private final LockableConcurrentApproximatePriorityQueue<DocumentsWriterPerThread> freeList = freeList = new ConcurrentHashMap<>();
new LockableConcurrentApproximatePriorityQueue<>(); private final Function<Integer, DocumentsWriterPerThread> dwptFactory;
private final Supplier<DocumentsWriterPerThread> dwptFactory;
private int takenWriterPermits = 0; private int takenWriterPermits = 0;
private volatile boolean closed; private volatile boolean closed;
DocumentsWriterPerThreadPool(Supplier<DocumentsWriterPerThread> dwptFactory) { DocumentsWriterPerThreadPool(Function<Integer, DocumentsWriterPerThread> dwptFactory) {
this.dwptFactory = dwptFactory; this.dwptFactory = dwptFactory;
} }
/** Returns the active number of {@link DocumentsWriterPerThread} instances. */ /** Returns the active number of {@link DocumentsWriterPerThread} instances. */
synchronized int size() { synchronized int size() {
return dwpts.size(); int si = 0;
for (int dwptGroupNumber : dwpts.keySet()) {
si += dwpts.get(dwptGroupNumber).size();
}
return si;
} }
synchronized void lockNewWriters() { synchronized void lockNewWriters() {
@ -82,7 +88,7 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
* *
* @return a new {@link DocumentsWriterPerThread} * @return a new {@link DocumentsWriterPerThread}
*/ */
private synchronized DocumentsWriterPerThread newWriter() { private synchronized DocumentsWriterPerThread newWriter(int dwptGroupNumber) {
assert takenWriterPermits >= 0; assert takenWriterPermits >= 0;
while (takenWriterPermits > 0) { while (takenWriterPermits > 0) {
// we can't create new DWPTs while not all permits are available // we can't create new DWPTs while not all permits are available
@ -99,12 +105,20 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
// end of the world it's violating the contract that we don't release any new DWPT after this // end of the world it's violating the contract that we don't release any new DWPT after this
// pool is closed // pool is closed
ensureOpen(); ensureOpen();
DocumentsWriterPerThread dwpt = dwptFactory.get(); DocumentsWriterPerThread dwpt = dwptFactory.apply(dwptGroupNumber);
dwpt.lock(); // lock so nobody else will get this DWPT dwpt.lock(); // lock so nobody else will get this DWPT
dwpts.add(dwpt); getDwpts(dwptGroupNumber).add(dwpt);
return dwpt; return dwpt;
} }
private synchronized Set<DocumentsWriterPerThread> getDwpts(int dwptGroupNumber) {
if (!dwpts.containsKey(dwptGroupNumber)) {
dwpts.put(dwptGroupNumber, Collections.newSetFromMap(new IdentityHashMap<>()));
}
return dwpts.get(dwptGroupNumber);
}
// TODO: maybe we should try to do load leveling here: we want roughly even numbers // TODO: maybe we should try to do load leveling here: we want roughly even numbers
// of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
@ -112,9 +126,9 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
* This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing * This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing
* operation (add/updateDocument). * operation (add/updateDocument).
*/ */
DocumentsWriterPerThread getAndLock() { DocumentsWriterPerThread getAndLock(final int dwptGroupNumber) {
ensureOpen(); ensureOpen();
DocumentsWriterPerThread dwpt = freeList.lockAndPoll(); DocumentsWriterPerThread dwpt = getFreeList(dwptGroupNumber).lockAndPoll();
if (dwpt != null) { if (dwpt != null) {
return dwpt; return dwpt;
} }
@ -123,7 +137,16 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
// `freeList` at this point, it will be added later on once DocumentsWriter has indexed a // `freeList` at this point, it will be added later on once DocumentsWriter has indexed a
// document into this DWPT and then gives it back to the pool by calling // document into this DWPT and then gives it back to the pool by calling
// #marksAsFreeAndUnlock. // #marksAsFreeAndUnlock.
return newWriter(); return newWriter(dwptGroupNumber);
}
private LockableConcurrentApproximatePriorityQueue<DocumentsWriterPerThread> getFreeList(
final int dwptGroupNumber) {
if (!freeList.containsKey(dwptGroupNumber)) {
freeList.put(dwptGroupNumber, new LockableConcurrentApproximatePriorityQueue<>());
}
return freeList.get(dwptGroupNumber);
} }
private void ensureOpen() { private void ensureOpen() {
@ -133,20 +156,25 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
} }
private synchronized boolean contains(DocumentsWriterPerThread state) { private synchronized boolean contains(DocumentsWriterPerThread state) {
return dwpts.contains(state); return getDwpts(state.dwptGroupNumber).contains(state);
} }
void marksAsFreeAndUnlock(DocumentsWriterPerThread state) { void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
final long ramBytesUsed = state.ramBytesUsed(); final long ramBytesUsed = state.ramBytesUsed();
assert contains(state) assert contains(state)
: "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT"; : "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT";
freeList.addAndUnlock(state, ramBytesUsed); getFreeList(state.dwptGroupNumber).addAndUnlock(state, ramBytesUsed);
} }
@Override @Override
public synchronized Iterator<DocumentsWriterPerThread> iterator() { public synchronized Iterator<DocumentsWriterPerThread> iterator() {
// copy on read - this is a quick op since num states is low // copy on read - this is a quick op since num states is low
return List.copyOf(dwpts).iterator(); List<DocumentsWriterPerThread> list = new ArrayList<>();
for (int groupId : dwpts.keySet()) {
list.addAll(List.copyOf(dwpts.get(groupId)));
}
return list.iterator();
} }
/** /**
@ -182,10 +210,10 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
// #getAndLock cannot pull this DWPT out of the pool since #getAndLock does a DWPT#tryLock to // #getAndLock cannot pull this DWPT out of the pool since #getAndLock does a DWPT#tryLock to
// check if the DWPT is available. // check if the DWPT is available.
assert perThread.isHeldByCurrentThread(); assert perThread.isHeldByCurrentThread();
if (dwpts.remove(perThread)) { if (getDwpts(perThread.dwptGroupNumber).remove(perThread)) {
freeList.remove(perThread); getFreeList(perThread.dwptGroupNumber).remove(perThread);
} else { } else {
assert freeList.contains(perThread) == false; assert getFreeList(perThread.dwptGroupNumber).contains(perThread) == false;
return false; return false;
} }
return true; return true;
@ -193,7 +221,7 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
/** Returns <code>true</code> if this DWPT is still part of the pool */ /** Returns <code>true</code> if this DWPT is still part of the pool */
synchronized boolean isRegistered(DocumentsWriterPerThread perThread) { synchronized boolean isRegistered(DocumentsWriterPerThread perThread) {
return dwpts.contains(perThread); return getDwpts(perThread.dwptGroupNumber).contains(perThread);
} }
@Override @Override

View File

@ -116,6 +116,9 @@ public class LiveIndexWriterConfig {
/** The IndexWriter event listener to record key events * */ /** The IndexWriter event listener to record key events * */
protected IndexWriterEventListener eventListener; protected IndexWriterEventListener eventListener;
/** DWPT criteria definition used to select pool group from which DWPT will be selected. */
protected DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition;
// used by IndexWriterConfig // used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) { LiveIndexWriterConfig(Analyzer analyzer) {
this.analyzer = analyzer; this.analyzer = analyzer;
@ -139,6 +142,8 @@ public class LiveIndexWriterConfig {
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
maxFullFlushMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS; maxFullFlushMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS;
eventListener = IndexWriterEventListener.NO_OP_LISTENER; eventListener = IndexWriterEventListener.NO_OP_LISTENER;
dwptGroupingCriteriaDefinition =
DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUPING_CRITERIA_DEFINITION;
} }
/** Returns the default analyzer to use for indexing documents. */ /** Returns the default analyzer to use for indexing documents. */
@ -377,6 +382,26 @@ public class LiveIndexWriterConfig {
return this; return this;
} }
/**
* Set DWPT grouping criteria definition
*
* @param dwptGroupingCriteriaDefinition the passed DWPT grouping criteria definition.
*/
public LiveIndexWriterConfig setDWPTGroupingCriteriaFunction(
final DWPTGroupingCriteriaDefinition dwptGroupingCriteriaDefinition) {
this.dwptGroupingCriteriaDefinition = dwptGroupingCriteriaDefinition;
return this;
}
/**
* Get DWPT grouping criteria definition
*
* @return dwptGroupingCriteriaDefinition
*/
public DWPTGroupingCriteriaDefinition getDwptGroupingCriteriaDefinition() {
return dwptGroupingCriteriaDefinition;
}
/** /**
* Returns <code>true</code> iff the {@link IndexWriter} packs newly written segments in a * Returns <code>true</code> iff the {@link IndexWriter} packs newly written segments in a
* compound file. Default is <code>true</code>. * compound file. Default is <code>true</code>.

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException; import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.function.Function; import java.util.function.Function;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
@ -605,4 +606,45 @@ public class TestDocumentWriter extends LuceneTestCase {
} }
} }
} }
public void testCriteriaBasedDWPTGrouping() throws IOException {
int[] statusCodes = new int[] {200, 201, 400, 404, 500, 200, 300};
final IndexWriterConfig config = newIndexWriterConfig(new MockAnalyzer(random()));
config.setMergePolicy(new CriteriaBasedGroupingTieredMergePolicy());
config.setDWPTGroupingCriteriaFunction(DocHelper.getDWPTCriteriaDefinition());
IndexWriter writer = new IndexWriter(dir, config);
for (int statusCode : statusCodes) {
final Document logDocument = DocHelper.createLogDocument(17, "test", statusCode);
writer.addDocument(logDocument);
}
writer.commit();
SegmentInfos infos = writer.cloneSegmentInfos();
assertEquals(infos.size(), 2);
Iterator<SegmentCommitInfo> segInfoIterator = infos.iterator();
SegmentCommitInfo si = segInfoIterator.next();
// First segment will contain 2xx and 3xx logs.
SegmentReader reader = new SegmentReader(si, Version.LATEST.major, newIOContext(random()));
assertTrue(reader != null);
Document doc = reader.storedFields().document(0);
assertTrue(doc != null);
IndexableField[] fields = doc.getFields("statuscode");
assertTrue(fields != null && fields.length == 1);
assertTrue(fields[0].stringValue().startsWith("2") || fields[0].stringValue().startsWith("3"));
reader.close();
si = segInfoIterator.next();
// Second segments will contain 4xx and 5xx logs.
reader = new SegmentReader(si, Version.LATEST.major, newIOContext(random()));
assertTrue(reader != null);
doc = reader.storedFields().document(0);
assertTrue(doc != null);
fields = doc.getFields("statuscode");
assertTrue(fields != null && fields.length == 1);
assertTrue(fields[0].stringValue().startsWith("4") || fields[0].stringValue().startsWith("5"));
writer.close();
reader.close();
}
} }

View File

@ -31,7 +31,7 @@ public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
try (Directory directory = newDirectory()) { try (Directory directory = newDirectory()) {
DocumentsWriterPerThreadPool pool = DocumentsWriterPerThreadPool pool =
new DocumentsWriterPerThreadPool( new DocumentsWriterPerThreadPool(
() -> (dwptGroupNumber) ->
new DocumentsWriterPerThread( new DocumentsWriterPerThread(
Version.LATEST.major, Version.LATEST.major,
"", "",
@ -41,15 +41,19 @@ public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
new DocumentsWriterDeleteQueue(null), new DocumentsWriterDeleteQueue(null),
null, null,
new AtomicLong(), new AtomicLong(),
false)); false,
dwptGroupNumber));
DocumentsWriterPerThread first = pool.getAndLock(); DocumentsWriterPerThread first =
pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER);
assertEquals(1, pool.size()); assertEquals(1, pool.size());
DocumentsWriterPerThread second = pool.getAndLock(); DocumentsWriterPerThread second =
pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER);
assertEquals(2, pool.size()); assertEquals(2, pool.size());
pool.marksAsFreeAndUnlock(first); pool.marksAsFreeAndUnlock(first);
assertEquals(2, pool.size()); assertEquals(2, pool.size());
DocumentsWriterPerThread third = pool.getAndLock(); DocumentsWriterPerThread third =
pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER);
assertSame(first, third); assertSame(first, third);
assertEquals(2, pool.size()); assertEquals(2, pool.size());
pool.checkout(third); pool.checkout(third);
@ -71,7 +75,7 @@ public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
try (Directory directory = newDirectory()) { try (Directory directory = newDirectory()) {
DocumentsWriterPerThreadPool pool = DocumentsWriterPerThreadPool pool =
new DocumentsWriterPerThreadPool( new DocumentsWriterPerThreadPool(
() -> (dwptGroupNumber) ->
new DocumentsWriterPerThread( new DocumentsWriterPerThread(
Version.LATEST.major, Version.LATEST.major,
"", "",
@ -81,9 +85,11 @@ public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
new DocumentsWriterDeleteQueue(null), new DocumentsWriterDeleteQueue(null),
null, null,
new AtomicLong(), new AtomicLong(),
false)); false,
dwptGroupNumber));
DocumentsWriterPerThread first = pool.getAndLock(); DocumentsWriterPerThread first =
pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER);
pool.lockNewWriters(); pool.lockNewWriters();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Thread t = Thread t =
@ -91,7 +97,7 @@ public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
() -> { () -> {
try { try {
latch.countDown(); latch.countDown();
pool.getAndLock(); pool.getAndLock(DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER);
fail(); fail();
} catch ( } catch (
@SuppressWarnings("unused") @SuppressWarnings("unused")

View File

@ -4126,7 +4126,9 @@ public class TestIndexWriter extends LuceneTestCase {
List<Closeable> states = new ArrayList<>(); List<Closeable> states = new ArrayList<>();
try { try {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
DocumentsWriterPerThread state = w.docWriter.perThreadPool.getAndLock(); DocumentsWriterPerThread state =
w.docWriter.perThreadPool.getAndLock(
DWPTGroupingCriteriaDefinition.DEFAULT_DWPT_GROUP_NUMBER);
states.add(state::unlock); states.add(state::unlock);
state.deleteQueue.getNextSequenceNumber(); state.deleteQueue.getNextSequenceNumber();
} }

View File

@ -19,8 +19,10 @@ package org.apache.lucene.tests.index;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.function.Function;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
@ -28,6 +30,7 @@ import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField; import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField; import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DWPTGroupingCriteriaDefinition;
import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
@ -326,4 +329,51 @@ public class DocHelper {
} }
return doc; return doc;
} }
/**
* Creates a log document with a given id, indexName and status code
*
* @param n id of the document
* @param indexName index name where document is added
* @param statusCode status code of the log entry
* @return a log document with a given id, indexName and status code
*/
public static Document createLogDocument(int n, String indexName, int statusCode) {
Document doc = new Document();
doc.add(new Field("id", Integer.toString(n), STRING_TYPE_STORED_WITH_TVS));
doc.add(new Field("indexname", indexName, STRING_TYPE_STORED_WITH_TVS));
doc.add(new Field("statuscode", Integer.toString(statusCode), STRING_TYPE_STORED_WITH_TVS));
doc.add(new Field("@timestamp", "898787158", STRING_TYPE_STORED_WITH_TVS));
doc.add(new Field("clientip", "5.12.211.1", STRING_TYPE_STORED_WITH_TVS));
doc.add(new Field("request", "GET /gematu/lowsea.gif HTTP/1.1", STRING_TYPE_STORED_WITH_TVS));
return doc;
}
/**
* Returns a criteria definition which groups data by status code.
*
* @return a criteria definition which groups data by status code.
*/
public static DWPTGroupingCriteriaDefinition getDWPTCriteriaDefinition() {
final Function<Iterable<? extends Iterable<? extends IndexableField>>, Integer>
criteriaFunction =
(docs) -> {
Iterator<? extends IndexableField> docIt = docs.iterator().next().iterator();
while (docIt.hasNext()) {
IndexableField field = docIt.next();
if (field.stringValue() != null && field.name().equals("statuscode")) {
int statusCode = Integer.parseInt(field.stringValue());
if ((statusCode / 100) == 2 || (statusCode / 100) == 3) {
return 1;
} else if ((statusCode / 100) == 4 || (statusCode / 100) == 5) {
return 2;
}
}
}
return 0;
};
return new DWPTGroupingCriteriaDefinition(criteriaFunction, 4);
}
} }