internal refactoring/simplification in percolator
This commit is contained in:
parent
f2bd2c7bbd
commit
b4e5a542f3
|
@ -85,7 +85,7 @@ public class EmbeddedPercolatorBenchmarkTest {
|
|||
.endObject().endObject().endObject();
|
||||
final byte[] source = doc.copiedBytes();
|
||||
|
||||
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
|
||||
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest(source));
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_QUERIES; i++) {
|
||||
percolatorExecutor.addQuery("test" + i, termQuery("field3", "quick"));
|
||||
|
@ -96,7 +96,7 @@ public class EmbeddedPercolatorBenchmarkTest {
|
|||
StopWatch stopWatch = new StopWatch().start();
|
||||
System.out.println("Running " + 1000);
|
||||
for (long i = 0; i < 1000; i++) {
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest(source));
|
||||
}
|
||||
System.out.println("[Warmup] Percolated in " + stopWatch.stop().totalTime() + " TP Millis " + (NUMBER_OF_ITERATIONS / stopWatch.totalTime().millisFrac()));
|
||||
|
||||
|
@ -107,7 +107,7 @@ public class EmbeddedPercolatorBenchmarkTest {
|
|||
threads[i] = new Thread(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (long i = 0; i < NUMBER_OF_ITERATIONS; i++) {
|
||||
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
|
||||
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest(source));
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ public class TransportPercolateAction extends TransportSingleCustomOperationActi
|
|||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
PercolatorService percolatorService = indexService.percolateService();
|
||||
|
||||
PercolatorExecutor.Response percolate = percolatorService.percolate(new PercolatorExecutor.Request(request.source()));
|
||||
PercolatorExecutor.Response percolate = percolatorService.percolate(new PercolatorExecutor.SourceRequest(request.source()));
|
||||
return new PercolateResponse(percolate.matches());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.field.data.FieldData;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
|
@ -50,7 +51,9 @@ import org.elasticsearch.index.query.IndexQueryParserService;
|
|||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
|
@ -66,16 +69,16 @@ import static org.elasticsearch.index.mapper.SourceToParse.*;
|
|||
*/
|
||||
public class PercolatorExecutor extends AbstractIndexComponent {
|
||||
|
||||
public static class Request {
|
||||
public static class SourceRequest {
|
||||
private final byte[] source;
|
||||
private final int offset;
|
||||
private final int length;
|
||||
|
||||
public Request(byte[] source) {
|
||||
public SourceRequest(byte[] source) {
|
||||
this(source, 0, source.length);
|
||||
}
|
||||
|
||||
public Request(byte[] source, int offset, int length) {
|
||||
public SourceRequest(byte[] source, int offset, int length) {
|
||||
this.source = source;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
|
@ -84,6 +87,32 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
public byte[] source() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public int offset() {
|
||||
return this.offset;
|
||||
}
|
||||
|
||||
public int length() {
|
||||
return this.length;
|
||||
}
|
||||
}
|
||||
|
||||
public static class DocAndQueryRequest {
|
||||
private final ParsedDocument doc;
|
||||
@Nullable private final Query query;
|
||||
|
||||
public DocAndQueryRequest(ParsedDocument doc, @Nullable Query query) {
|
||||
this.doc = doc;
|
||||
this.query = query;
|
||||
}
|
||||
|
||||
public ParsedDocument doc() {
|
||||
return this.doc;
|
||||
}
|
||||
|
||||
@Nullable Query query() {
|
||||
return this.query;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class Response {
|
||||
|
@ -108,16 +137,39 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
|
||||
private final IndexQueryParserService queryParserService;
|
||||
|
||||
private final IndexCache indexCache;
|
||||
|
||||
private volatile ImmutableMap<String, Query> queries = ImmutableMap.of();
|
||||
|
||||
|
||||
private final PercolatorIndexAndShardListener percolatorIndexAndShardListener = new PercolatorIndexAndShardListener();
|
||||
|
||||
private volatile IndicesLifecycle indicesLifecycle;
|
||||
|
||||
private volatile IndexService percolatorIndex;
|
||||
|
||||
private volatile IndexShard percolatorShard;
|
||||
|
||||
@Inject public PercolatorExecutor(Index index, @IndexSettings Settings indexSettings,
|
||||
MapperService mapperService, IndexQueryParserService queryParserService) {
|
||||
MapperService mapperService, IndexQueryParserService queryParserService,
|
||||
IndexCache indexCache) {
|
||||
super(index, indexSettings);
|
||||
this.mapperService = mapperService;
|
||||
this.queryParserService = queryParserService;
|
||||
this.indexCache = indexCache;
|
||||
}
|
||||
|
||||
public void setIndicesLifecycle(IndicesLifecycle indicesLifecycle) {
|
||||
this.indicesLifecycle = indicesLifecycle;
|
||||
if (indicesLifecycle != null) {
|
||||
indicesLifecycle.addListener(percolatorIndexAndShardListener);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
if (indicesLifecycle != null) {
|
||||
indicesLifecycle.removeListener(percolatorIndexAndShardListener);
|
||||
}
|
||||
ImmutableMap<String, Query> old = queries;
|
||||
queries = ImmutableMap.of();
|
||||
old.clear();
|
||||
|
@ -181,17 +233,13 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
this.queries = MapBuilder.newMapBuilder(this.queries).putAll(queries).immutableMap();
|
||||
}
|
||||
|
||||
public Response percolate(final Request request) throws ElasticSearchException {
|
||||
return percolate(request, null, null);
|
||||
}
|
||||
|
||||
public Response percolate(final Request request, @Nullable final IndexService percolatorIndex, @Nullable final IndexShard percolatorShard) throws ElasticSearchException {
|
||||
public Response percolate(final SourceRequest request) throws ElasticSearchException {
|
||||
Query query = null;
|
||||
ParsedDocument doc = null;
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
|
||||
parser = XContentFactory.xContent(request.source()).createParser(request.source());
|
||||
parser = XContentFactory.xContent(request.source(), request.offset(), request.length()).createParser(request.source(), request.offset(), request.length());
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
|
@ -225,11 +273,14 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
throw new PercolatorException(index, "No doc to percolate in the request");
|
||||
}
|
||||
|
||||
return percolate(new DocAndQueryRequest(doc, query));
|
||||
}
|
||||
|
||||
public Response percolate(DocAndQueryRequest request) throws ElasticSearchException {
|
||||
// first, parse the source doc into a MemoryIndex
|
||||
final MemoryIndex memoryIndex = new MemoryIndex();
|
||||
|
||||
for (Fieldable field : doc.doc().getFields()) {
|
||||
for (Fieldable field : request.doc().doc().getFields()) {
|
||||
if (!field.isIndexed()) {
|
||||
continue;
|
||||
}
|
||||
|
@ -240,7 +291,7 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
Reader reader = field.readerValue();
|
||||
if (reader != null) {
|
||||
try {
|
||||
memoryIndex.addField(field.name(), doc.analyzer().reusableTokenStream(field.name(), reader), field.getBoost() * doc.doc().getBoost());
|
||||
memoryIndex.addField(field.name(), request.doc().analyzer().reusableTokenStream(field.name(), reader), field.getBoost() * request.doc().doc().getBoost());
|
||||
} catch (IOException e) {
|
||||
throw new MapperParsingException("Failed to analyze field [" + field.name() + "]", e);
|
||||
}
|
||||
|
@ -248,7 +299,7 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
String value = field.stringValue();
|
||||
if (value != null) {
|
||||
try {
|
||||
memoryIndex.addField(field.name(), doc.analyzer().reusableTokenStream(field.name(), new FastStringReader(value)), field.getBoost() * doc.doc().getBoost());
|
||||
memoryIndex.addField(field.name(), request.doc().analyzer().reusableTokenStream(field.name(), new FastStringReader(value)), field.getBoost() * request.doc().doc().getBoost());
|
||||
} catch (IOException e) {
|
||||
throw new MapperParsingException("Failed to analyze field [" + field.name() + "]", e);
|
||||
}
|
||||
|
@ -260,7 +311,7 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
final IndexSearcher searcher = memoryIndex.createSearcher();
|
||||
|
||||
List<String> matches = new ArrayList<String>();
|
||||
if (query == null) {
|
||||
if (request.query() == null) {
|
||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
for (Map.Entry<String, Query> entry : queries.entrySet()) {
|
||||
try {
|
||||
|
@ -274,9 +325,12 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
if (percolatorIndex == null || percolatorShard == null) {
|
||||
throw new PercolateIndexUnavailable(new Index(PercolatorService.INDEX_NAME));
|
||||
}
|
||||
Engine.Searcher percolatorSearcher = percolatorShard.searcher();
|
||||
try {
|
||||
percolatorSearcher.searcher().search(query, new QueryCollector(logger, queries, searcher, percolatorIndex, matches));
|
||||
percolatorSearcher.searcher().search(request.query(), new QueryCollector(logger, queries, searcher, percolatorIndex, matches));
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to execute", e);
|
||||
} finally {
|
||||
|
@ -284,7 +338,35 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
|||
}
|
||||
}
|
||||
|
||||
return new Response(matches, doc.mappersAdded());
|
||||
indexCache.clear(searcher.getIndexReader());
|
||||
|
||||
return new Response(matches, request.doc().mappersAdded());
|
||||
}
|
||||
|
||||
class PercolatorIndexAndShardListener extends IndicesLifecycle.Listener {
|
||||
@Override public void afterIndexCreated(IndexService indexService) {
|
||||
if (indexService.index().name().equals(PercolatorService.INDEX_NAME)) {
|
||||
percolatorIndex = indexService;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void afterIndexClosed(Index index, boolean delete) {
|
||||
if (index.name().equals(PercolatorService.INDEX_NAME)) {
|
||||
percolatorIndex = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void afterIndexShardCreated(IndexShard indexShard) {
|
||||
if (indexShard.shardId().index().name().equals(PercolatorService.INDEX_NAME)) {
|
||||
percolatorShard = indexShard;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void afterIndexShardClosed(ShardId shardId, boolean delete) {
|
||||
if (shardId.index().name().equals(PercolatorService.INDEX_NAME)) {
|
||||
percolatorShard = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class QueryCollector extends Collector {
|
||||
|
|
|
@ -71,22 +71,15 @@ public class PercolatorService extends AbstractIndexComponent {
|
|||
this.percolator = percolator;
|
||||
this.shardLifecycleListener = new ShardLifecycleListener();
|
||||
this.indicesService.indicesLifecycle().addListener(shardLifecycleListener);
|
||||
this.percolator.setIndicesLifecycle(indicesService.indicesLifecycle());
|
||||
}
|
||||
|
||||
public void close() {
|
||||
this.indicesService.indicesLifecycle().removeListener(shardLifecycleListener);
|
||||
}
|
||||
|
||||
public PercolatorExecutor.Response percolate(PercolatorExecutor.Request request) throws PercolatorException {
|
||||
IndexService percolatorIndex = indicesService.indexService(INDEX_NAME);
|
||||
if (percolatorIndex == null) {
|
||||
throw new PercolateIndexUnavailable(new Index(INDEX_NAME));
|
||||
}
|
||||
if (percolatorIndex.numberOfShards() == 0) {
|
||||
throw new PercolateIndexUnavailable(new Index(INDEX_NAME));
|
||||
}
|
||||
IndexShard percolatorShard = percolatorIndex.shard(0);
|
||||
return percolator.percolate(request, percolatorIndex, percolatorShard);
|
||||
public PercolatorExecutor.Response percolate(PercolatorExecutor.SourceRequest request) throws PercolatorException {
|
||||
return percolator.percolate(request);
|
||||
}
|
||||
|
||||
private void loadQueries(String indexName) {
|
||||
|
@ -104,7 +97,6 @@ public class PercolatorService extends AbstractIndexComponent {
|
|||
throw new PercolatorException(index, "failed to load queries from percolator index");
|
||||
} finally {
|
||||
searcher.release();
|
||||
indexService.cache().clear(searcher.reader());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -86,25 +86,25 @@ public class PercolatorExecutorTests {
|
|||
.endObject().endObject().endObject();
|
||||
byte[] source = doc.copiedBytes();
|
||||
|
||||
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
|
||||
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest(source));
|
||||
assertThat(percolate.matches(), hasSize(0));
|
||||
|
||||
// add a query
|
||||
percolatorExecutor.addQuery("test1", termQuery("field2", "value"));
|
||||
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest(source));
|
||||
assertThat(percolate.matches(), hasSize(1));
|
||||
assertThat(percolate.matches(), hasItem("test1"));
|
||||
|
||||
percolatorExecutor.addQuery("test2", termQuery("field1", 1));
|
||||
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest(source));
|
||||
assertThat(percolate.matches(), hasSize(2));
|
||||
assertThat(percolate.matches(), hasItems("test1", "test2"));
|
||||
|
||||
|
||||
percolatorExecutor.removeQuery("test2");
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
|
||||
percolate = percolatorExecutor.percolate(new PercolatorExecutor.SourceRequest(source));
|
||||
assertThat(percolate.matches(), hasSize(1));
|
||||
assertThat(percolate.matches(), hasItems("test1"));
|
||||
}
|
||||
|
|
|
@ -57,6 +57,10 @@ public class SimplePercolatorTests extends AbstractNodesTests {
|
|||
@Test public void registerPercolatorAndThenCreateAnIndex() throws Exception {
|
||||
try {
|
||||
client.admin().indices().prepareDelete("test").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
client.admin().indices().prepareDelete("_percolator").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
|
@ -84,6 +88,10 @@ public class SimplePercolatorTests extends AbstractNodesTests {
|
|||
@Test public void createIndexAndThenRegisterPercolator() throws Exception {
|
||||
try {
|
||||
client.admin().indices().prepareDelete("test").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
client.admin().indices().prepareDelete("_percolator").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
|
@ -121,6 +129,10 @@ public class SimplePercolatorTests extends AbstractNodesTests {
|
|||
@Test public void dynamicAddingRemovingQueries() throws Exception {
|
||||
try {
|
||||
client.admin().indices().prepareDelete("test").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
client.admin().indices().prepareDelete("_percolator").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
|
|
Loading…
Reference in New Issue