diff --git a/bin/elasticsearch.bat b/bin/elasticsearch.bat index f894d14a3b8..8bb738da8b7 100644 Binary files a/bin/elasticsearch.bat and b/bin/elasticsearch.bat differ diff --git a/bin/elasticsearch.in.sh b/bin/elasticsearch.in.sh index 8bb16e6a11f..9150abbc9ff 100644 --- a/bin/elasticsearch.in.sh +++ b/bin/elasticsearch.in.sh @@ -1,6 +1,6 @@ #!/bin/sh -ES_CLASSPATH=$ES_CLASSPATH:$ES_HOME/lib/*:$ES_HOME/lib/sigar/* +ES_CLASSPATH=$ES_CLASSPATH:$ES_HOME/lib/${project.build.finalName}.jar:$ES_HOME/lib/*:$ES_HOME/lib/sigar/* if [ "x$ES_MIN_MEM" = "x" ]; then ES_MIN_MEM=256m diff --git a/pom.xml b/pom.xml index 2eb76e748c2..6c44056c712 100644 --- a/pom.xml +++ b/pom.xml @@ -373,6 +373,28 @@ + + maven-resources-plugin + 2.5 + + + copy-resources + prepare-package + + copy-resources + + + ${project.build.directory}/bin + + + ${basedir}/bin + true + + + + + + maven-assembly-plugin 2.3 @@ -436,7 +458,8 @@ - ${project.basedir}/bin + + ${project.build.directory}/bin directory *.bat, .DS_Store diff --git a/src/main/assemblies/targz-bin.xml b/src/main/assemblies/targz-bin.xml index acc703b4216..0e3650b5e81 100644 --- a/src/main/assemblies/targz-bin.xml +++ b/src/main/assemblies/targz-bin.xml @@ -14,6 +14,7 @@ 0755 0755 unix + true elasticsearch.in.sh elasticsearch diff --git a/src/main/assemblies/zip-bin.xml b/src/main/assemblies/zip-bin.xml index 06d77c2b10c..8d0b3e9b0d5 100644 --- a/src/main/assemblies/zip-bin.xml +++ b/src/main/assemblies/zip-bin.xml @@ -9,6 +9,7 @@ + true bin bin dos @@ -18,6 +19,7 @@ + true bin bin 0755 diff --git a/src/main/java/org/apache/lucene/index/BufferedDeletesStream.java b/src/main/java/org/apache/lucene/index/BufferedDeletesStream.java new file mode 100644 index 00000000000..cf2f88c05af --- /dev/null +++ b/src/main/java/org/apache/lucene/index/BufferedDeletesStream.java @@ -0,0 +1,451 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryWrapperFilter; +import org.apache.lucene.util.UnicodeUtil; +import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.bloom.BloomFilter; +import org.elasticsearch.index.cache.bloom.BloomCache; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/* Tracks the stream of {@link BuffereDeletes}. + * When DocumensWriter flushes, its buffered + * deletes are appended to this stream. We later + * apply these deletes (resolve them to the actual + * docIDs, per segment) when a merge is started + * (only to the to-be-merged segments). We + * also apply to all segments when NRT reader is pulled, + * commit/close is called, or when too many deletes are + * buffered and must be flushed (by RAM usage or by count). + * + * Each packet is assigned a generation, and each flushed or + * merged segment is also assigned a generation, so we can + * track which BufferedDeletes packets to apply to any given + * segment. */ + +// LUCENE MONITOR: We copied this class from Lucene, effectively overriding it with our implementation +// if it comes first in the classpath, allowing for faster apply deletes based on terms +class BufferedDeletesStream implements XIndexWriter.XBufferedDeletesStream { + + // TODO: maybe linked list? + private final List deletes = new ArrayList(); + + // Starts at 1 so that SegmentInfos that have never had + // deletes applied (whose bufferedDelGen defaults to 0) + // will be correct: + private long nextGen = 1; + + // used only by assert + private Term lastDeleteTerm; + + private PrintStream infoStream; + private final AtomicLong bytesUsed = new AtomicLong(); + private final AtomicInteger numTerms = new AtomicInteger(); + private final int messageID; + + private BloomCache bloomCache; + + public BufferedDeletesStream(int messageID) { + this.messageID = messageID; + } + + private synchronized void message(String message) { + if (infoStream != null) { + infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message); + } + } + + public synchronized void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + } + + public void setBloomCache(BloomCache bloomCache) { + this.bloomCache = bloomCache; + } + + // Appends a new packet of buffered deletes to the stream, + // setting its generation: + public synchronized void push(FrozenBufferedDeletes packet) { + assert packet.any(); + assert checkDeleteStats(); + assert packet.gen < nextGen; + deletes.add(packet); + numTerms.addAndGet(packet.numTermDeletes); + bytesUsed.addAndGet(packet.bytesUsed); + if (infoStream != null) { + message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size()); + } + assert checkDeleteStats(); + } + + public synchronized void clear() { + deletes.clear(); + nextGen = 1; + numTerms.set(0); + bytesUsed.set(0); + } + + public boolean any() { + return bytesUsed.get() != 0; + } + + public int numTerms() { + return numTerms.get(); + } + + public long bytesUsed() { + return bytesUsed.get(); + } + + public static class ApplyDeletesResult { + // True if any actual deletes took place: + public final boolean anyDeletes; + + // Current gen, for the merged segment: + public final long gen; + + // If non-null, contains segments that are 100% deleted + public final List allDeleted; + + ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted) { + this.anyDeletes = anyDeletes; + this.gen = gen; + this.allDeleted = allDeleted; + } + } + + // Sorts SegmentInfos from smallest to biggest bufferedDelGen: + private static final Comparator sortByDelGen = new Comparator() { + // @Override -- not until Java 1.6 + public int compare(SegmentInfo si1, SegmentInfo si2) { + final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen(); + if (cmp > 0) { + return 1; + } else if (cmp < 0) { + return -1; + } else { + return 0; + } + } + }; + + /** + * Resolves the buffered deleted Term/Query/docIDs, into + * actual deleted docIDs in the deletedDocs BitVector for + * each SegmentReader. + */ + public synchronized ApplyDeletesResult applyDeletes(IndexWriter.ReaderPool readerPool, List infos) throws IOException { + final long t0 = System.currentTimeMillis(); + + if (infos.size() == 0) { + return new ApplyDeletesResult(false, nextGen++, null); + } + + assert checkDeleteStats(); + + if (!any()) { + message("applyDeletes: no deletes; skipping"); + return new ApplyDeletesResult(false, nextGen++, null); + } + + if (infoStream != null) { + message("applyDeletes: infos=" + infos + " packetCount=" + deletes.size()); + } + + List infos2 = new ArrayList(); + infos2.addAll(infos); + Collections.sort(infos2, sortByDelGen); + + CoalescedDeletes coalescedDeletes = null; + boolean anyNewDeletes = false; + + int infosIDX = infos2.size() - 1; + int delIDX = deletes.size() - 1; + + List allDeleted = null; + + while (infosIDX >= 0) { + //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX); + + final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null; + final SegmentInfo info = infos2.get(infosIDX); + final long segGen = info.getBufferedDeletesGen(); + + if (packet != null && segGen < packet.gen) { + //System.out.println(" coalesce"); + if (coalescedDeletes == null) { + coalescedDeletes = new CoalescedDeletes(); + } + coalescedDeletes.update(packet); + delIDX--; + } else if (packet != null && segGen == packet.gen) { + //System.out.println(" eq"); + + // Lock order: IW -> BD -> RP + assert readerPool.infoIsLive(info); + SegmentReader reader = readerPool.get(info, false); + int delCount = 0; + final boolean segAllDeletes; + try { + if (coalescedDeletes != null) { + //System.out.println(" del coalesced"); + delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader); + delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader); + } + //System.out.println(" del exact"); + // Don't delete by Term here; DocumentsWriter + // already did that on flush: + delCount += applyQueryDeletes(packet.queriesIterable(), reader); + segAllDeletes = reader.numDocs() == 0; + } finally { + readerPool.release(reader); + } + anyNewDeletes |= delCount > 0; + + if (segAllDeletes) { + if (allDeleted == null) { + allDeleted = new ArrayList(); + } + allDeleted.add(info); + } + + if (infoStream != null) { + message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + } + + if (coalescedDeletes == null) { + coalescedDeletes = new CoalescedDeletes(); + } + coalescedDeletes.update(packet); + delIDX--; + infosIDX--; + info.setBufferedDeletesGen(nextGen); + + } else { + //System.out.println(" gt"); + + if (coalescedDeletes != null) { + // Lock order: IW -> BD -> RP + assert readerPool.infoIsLive(info); + SegmentReader reader = readerPool.get(info, false); + int delCount = 0; + final boolean segAllDeletes; + try { + delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader); + delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader); + segAllDeletes = reader.numDocs() == 0; + } finally { + readerPool.release(reader); + } + anyNewDeletes |= delCount > 0; + + if (segAllDeletes) { + if (allDeleted == null) { + allDeleted = new ArrayList(); + } + allDeleted.add(info); + } + + if (infoStream != null) { + message("seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + } + } + info.setBufferedDeletesGen(nextGen); + + infosIDX--; + } + } + + assert checkDeleteStats(); + if (infoStream != null) { + message("applyDeletes took " + (System.currentTimeMillis() - t0) + " msec"); + } + // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; + + return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted); + } + + public synchronized long getNextGen() { + return nextGen++; + } + + // Lock order IW -> BD + /* Removes any BufferedDeletes that we no longer need to + * store because all segments in the index have had the + * deletes applied. */ + public synchronized void prune(SegmentInfos segmentInfos) { + assert checkDeleteStats(); + long minGen = Long.MAX_VALUE; + for (SegmentInfo info : segmentInfos) { + minGen = Math.min(info.getBufferedDeletesGen(), minGen); + } + + if (infoStream != null) { + message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size()); + } + + final int limit = deletes.size(); + for (int delIDX = 0; delIDX < limit; delIDX++) { + if (deletes.get(delIDX).gen >= minGen) { + prune(delIDX); + assert checkDeleteStats(); + return; + } + } + + // All deletes pruned + prune(limit); + assert !any(); + assert checkDeleteStats(); + } + + private synchronized void prune(int count) { + if (count > 0) { + if (infoStream != null) { + message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain"); + } + for (int delIDX = 0; delIDX < count; delIDX++) { + final FrozenBufferedDeletes packet = deletes.get(delIDX); + numTerms.addAndGet(-packet.numTermDeletes); + assert numTerms.get() >= 0; + bytesUsed.addAndGet(-packet.bytesUsed); + assert bytesUsed.get() >= 0; + } + deletes.subList(0, count).clear(); + } + } + + // ES CHANGE: Add bloom filter usage + // Delete by Term + private synchronized long applyTermDeletes(Iterable termsIter, SegmentReader reader) throws IOException { + long delCount = 0; + + assert checkDeleteTerm(null); + + BloomFilter filter = bloomCache == null ? BloomFilter.NONE : bloomCache.filter(reader, UidFieldMapper.NAME, true); + UnicodeUtil.UTF8Result utf8 = new UnicodeUtil.UTF8Result(); + + TermDocs docs = reader.termDocs(); + + for (Term term : termsIter) { + + if (term.field() == UidFieldMapper.NAME) { + Unicode.fromStringAsUtf8(term.text(), utf8); + if (!filter.isPresent(utf8.result, 0, utf8.length)) { + continue; + } + } + if (docs == null) { + docs = reader.termDocs(); + } + + // Since we visit terms sorted, we gain performance + // by re-using the same TermsEnum and seeking only + // forwards + assert checkDeleteTerm(term); + docs.seek(term); + + while (docs.next()) { + final int docID = docs.doc(); + reader.deleteDocument(docID); + // TODO: we could/should change + // reader.deleteDocument to return boolean + // true if it did in fact delete, because here + // we could be deleting an already-deleted doc + // which makes this an upper bound: + delCount++; + } + } + + return delCount; + } + + public static class QueryAndLimit { + public final Query query; + public final int limit; + + public QueryAndLimit(Query query, int limit) { + this.query = query; + this.limit = limit; + } + } + + // Delete by query + private synchronized long applyQueryDeletes(Iterable queriesIter, SegmentReader reader) throws IOException { + long delCount = 0; + + for (QueryAndLimit ent : queriesIter) { + Query query = ent.query; + int limit = ent.limit; + final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(reader); + if (docs != null) { + final DocIdSetIterator it = docs.iterator(); + if (it != null) { + while (true) { + int doc = it.nextDoc(); + if (doc >= limit) + break; + + reader.deleteDocument(doc); + // TODO: we could/should change + // reader.deleteDocument to return boolean + // true if it did in fact delete, because here + // we could be deleting an already-deleted doc + // which makes this an upper bound: + delCount++; + } + } + } + } + + return delCount; + } + + // used only by assert + private boolean checkDeleteTerm(Term term) { + if (term != null) { + assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0 : "lastTerm=" + lastDeleteTerm + " vs term=" + term; + } + // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert + lastDeleteTerm = term == null ? null : new Term(term.field(), term.text()); + return true; + } + + // only for assert + private boolean checkDeleteStats() { + int numTerms2 = 0; + long bytesUsed2 = 0; + for (FrozenBufferedDeletes packet : deletes) { + numTerms2 += packet.numTermDeletes; + bytesUsed2 += packet.bytesUsed; + } + assert numTerms2 == numTerms.get() : "numTerms2=" + numTerms2 + " vs " + numTerms.get(); + assert bytesUsed2 == bytesUsed.get() : "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed; + return true; + } +} diff --git a/src/main/java/org/apache/lucene/index/XIndexWriter.java b/src/main/java/org/apache/lucene/index/XIndexWriter.java new file mode 100644 index 00000000000..72f052a8259 --- /dev/null +++ b/src/main/java/org/apache/lucene/index/XIndexWriter.java @@ -0,0 +1,29 @@ +package org.apache.lucene.index; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.LockObtainFailedException; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.index.cache.bloom.BloomCache; + +import java.io.IOException; + +/** + */ +public class XIndexWriter extends IndexWriter { + + private final ESLogger logger; + + public XIndexWriter(Directory d, IndexWriterConfig conf, ESLogger logger, BloomCache bloomCache) throws CorruptIndexException, LockObtainFailedException, IOException { + super(d, conf); + this.logger = logger; + if (bufferedDeletesStream instanceof XBufferedDeletesStream) { + logger.debug("using bloom filter enhanced delete handling"); + ((XBufferedDeletesStream) bufferedDeletesStream).setBloomCache(bloomCache); + } + } + + public static interface XBufferedDeletesStream { + + void setBloomCache(BloomCache bloomCache); + } +} diff --git a/src/main/java/org/elasticsearch/common/Unicode.java b/src/main/java/org/elasticsearch/common/Unicode.java index a11d5a94331..cd6c4a9b2d4 100644 --- a/src/main/java/org/elasticsearch/common/Unicode.java +++ b/src/main/java/org/elasticsearch/common/Unicode.java @@ -60,6 +60,14 @@ public class Unicode { return result; } + public static void fromStringAsUtf8(String source, UnicodeUtil.UTF8Result result) { + if (source == null) { + result.length = 0; + return; + } + UnicodeUtil.UTF16toUTF8(source, 0, source.length(), result); + } + public static UnicodeUtil.UTF8Result unsafeFromStringAsUtf8(String source) { if (source == null) { return null; diff --git a/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java b/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java index 791dc28a961..f84a8b3e6c0 100644 --- a/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java +++ b/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java @@ -183,6 +183,7 @@ public class SimpleBloomCache extends AbstractIndexComponent implements BloomCac TermDocs termDocs = null; TermEnum termEnum = null; try { + UnicodeUtil.UTF8Result utf8Result = new UnicodeUtil.UTF8Result(); BloomFilter filter = BloomFilterFactory.getFilter(reader.numDocs(), 15); termDocs = reader.termDocs(); termEnum = reader.terms(new Term(field)); @@ -191,7 +192,7 @@ public class SimpleBloomCache extends AbstractIndexComponent implements BloomCac if (term == null || term.field() != field) break; // LUCENE MONITOR: 4.0, move to use bytes! - UnicodeUtil.UTF8Result utf8Result = Unicode.fromStringAsUtf8(term.text()); + Unicode.fromStringAsUtf8(term.text(), utf8Result); termDocs.seek(termEnum); while (termDocs.next()) { // when traversing, make sure to ignore deleted docs, so the key->docId will be correct diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 11f52895673..0aefb7a54ba 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -65,7 +65,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -1307,7 +1306,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { config.setReaderTermsIndexDivisor(termIndexDivisor); config.setMaxThreadStates(indexConcurrency); - indexWriter = new IndexWriter(store.directory(), config); + indexWriter = new XIndexWriter(store.directory(), config, logger, bloomCache); } catch (IOException e) { safeClose(indexWriter); throw e; diff --git a/src/main/java/org/elasticsearch/index/mapper/internal/UidFieldMapper.java b/src/main/java/org/elasticsearch/index/mapper/internal/UidFieldMapper.java index 232374563e4..cf5a7aebf6d 100644 --- a/src/main/java/org/elasticsearch/index/mapper/internal/UidFieldMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/internal/UidFieldMapper.java @@ -38,7 +38,7 @@ import static org.elasticsearch.index.mapper.MapperBuilders.uid; */ public class UidFieldMapper extends AbstractFieldMapper implements InternalMapper, RootMapper { - public static final String NAME = "_uid"; + public static final String NAME = "_uid".intern(); public static final Term TERM_FACTORY = new Term(NAME, ""); diff --git a/src/main/java/org/elasticsearch/index/search/UidFilter.java b/src/main/java/org/elasticsearch/index/search/UidFilter.java index b0f6a343b40..79e65d896b2 100644 --- a/src/main/java/org/elasticsearch/index/search/UidFilter.java +++ b/src/main/java/org/elasticsearch/index/search/UidFilter.java @@ -69,9 +69,10 @@ public class UidFilter extends Filter { BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, true); FixedBitSet set = null; TermDocs td = null; + UnicodeUtil.UTF8Result utf8 = new UnicodeUtil.UTF8Result(); try { for (Term uid : uids) { - UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(uid.text()); + Unicode.fromStringAsUtf8(uid.text(), utf8); if (!filter.isPresent(utf8.result, 0, utf8.length)) { continue; } diff --git a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java index 90876fd75f8..20437bc984a 100644 --- a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java +++ b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java @@ -65,11 +65,12 @@ public class SingleThreadIndexingStress { StopWatch stopWatch = new StopWatch().start(); int COUNT = 200000; + int ID_RANGE = 100; System.out.println("Indexing [" + COUNT + "] ..."); int i = 1; for (; i <= COUNT; i++) { // client1.admin().cluster().preparePingSingle("test", "type1", Integer.toString(i)).execute().actionGet(); - client1.prepareIndex("test", "type1").setId(Integer.toString(i)).setSource(source(Integer.toString(i), "test" + i)) + client1.prepareIndex("test", "type1").setId(Integer.toString(i % ID_RANGE)).setSource(source(Integer.toString(i), "test" + i)) .setCreate(false).execute().actionGet(); if ((i % 10000) == 0) { System.out.println("Indexed " + i + " took " + stopWatch.stop().lastTaskTime());