Percolator, closes #624.

This commit is contained in:
kimchy 2011-01-13 16:20:31 +02:00
parent 2d180eb28a
commit 180d225016
40 changed files with 1777 additions and 219 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.benchmark.percolator;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -33,8 +34,7 @@ import org.elasticsearch.index.analysis.AnalysisModule;
import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.index.mapper.MapperServiceModule;
import org.elasticsearch.index.percolator.PercolatorModule;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.index.query.IndexQueryParserModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
@ -69,22 +69,26 @@ public class EmbeddedPercolatorBenchmarkTest {
new SimilarityModule(settings),
new IndexQueryParserModule(settings),
new IndexNameModule(index),
new PercolatorModule()
new AbstractModule() {
@Override protected void configure() {
bind(PercolatorExecutor.class).asEagerSingleton();
}
}
).createInjector();
final PercolatorService percolatorService = injector.getInstance(PercolatorService.class);
final PercolatorExecutor percolatorExecutor = injector.getInstance(PercolatorExecutor.class);
XContentBuilder doc = XContentFactory.jsonBuilder().startObject()
XContentBuilder doc = XContentFactory.jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("field1", 1)
.field("field2", "value")
.field("field3", "the quick brown fox jumped over the lazy dog")
.endObject();
.endObject().endObject().endObject();
final byte[] source = doc.copiedBytes();
PercolatorService.Response percolate = percolatorService.percolate(new PercolatorService.Request("type1", source));
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
for (int i = 0; i < NUMBER_OF_QUERIES; i++) {
percolatorService.addQuery("test" + i, termQuery("field3", "quick"));
percolatorExecutor.addQuery("test" + i, termQuery("field3", "quick"));
}
@ -92,7 +96,7 @@ public class EmbeddedPercolatorBenchmarkTest {
StopWatch stopWatch = new StopWatch().start();
System.out.println("Running " + 1000);
for (long i = 0; i < 1000; i++) {
percolate = percolatorService.percolate(new PercolatorService.Request("type1", source));
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
}
System.out.println("[Warmup] Percolated in " + stopWatch.stop().totalTime() + " TP Millis " + (NUMBER_OF_ITERATIONS / stopWatch.totalTime().millisFrac()));
@ -103,7 +107,7 @@ public class EmbeddedPercolatorBenchmarkTest {
threads[i] = new Thread(new Runnable() {
@Override public void run() {
for (long i = 0; i < NUMBER_OF_ITERATIONS; i++) {
PercolatorService.Response percolate = percolatorService.percolate(new PercolatorService.Request("type1", source));
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
}
latch.countDown();
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.percolator;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.Node;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
import static org.elasticsearch.node.NodeBuilder.*;
/**
* @author kimchy (shay.banon)
*/
public class SinglePercolatorStressBenchmark {
public static void main(String[] args) throws Exception {
Settings settings = settingsBuilder()
.put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
.put("gateway.type", "none")
.put(SETTING_NUMBER_OF_SHARDS, 2)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.build();
Node[] nodes = new Node[2];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node" + i)).node();
}
Node client = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "client")).client(true).node();
Client client1 = client.client();
client1.admin().indices().create(createIndexRequest("test")).actionGet();
Thread.sleep(1000);
int COUNT = 200000;
int QUERIES = 10;
// register queries
for (int i = 0; i < QUERIES; i++) {
client1.prepareIndex("_percolator", "test", Integer.toString(i))
.setSource(jsonBuilder().startObject()
.field("query", termQuery("name", "value"))
.endObject())
.setRefresh(true)
.execute().actionGet();
}
StopWatch stopWatch = new StopWatch().start();
System.out.println("Percolating [" + COUNT + "] ...");
int i = 1;
for (; i <= COUNT; i++) {
PercolateResponse percolate = client1.preparePercolate("test").setSource(source(Integer.toString(i), "value"))
.execute().actionGet();
if (percolate.matches().size() != QUERIES) {
System.err.println("No matching number of queries");
}
if ((i % 10000) == 0) {
System.out.println("Percolated " + i + " took " + stopWatch.stop().lastTaskTime());
stopWatch.start();
}
}
System.out.println("Percolation took " + stopWatch.totalTime() + ", TPS " + (((double) COUNT) / stopWatch.totalTime().secondsFrac()));
client.close();
for (Node node : nodes) {
node.close();
}
}
private static XContentBuilder source(String id, String nameValue) throws IOException {
long time = System.currentTimeMillis();
return jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("id", id)
.field("numeric1", time)
.field("numeric2", time)
.field("numeric3", time)
.field("numeric4", time)
.field("numeric5", time)
.field("numeric6", time)
.field("numeric7", time)
.field("numeric8", time)
.field("numeric9", time)
.field("numeric10", time)
.field("name", nameValue)
.endObject().endObject().endObject();
}
}

View File

@ -59,6 +59,7 @@ import org.elasticsearch.action.deletebyquery.TransportShardDeleteByQueryAction;
import org.elasticsearch.action.get.TransportGetAction;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.mlt.TransportMoreLikeThisAction;
import org.elasticsearch.action.percolate.TransportPercolateAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.search.type.*;
@ -131,5 +132,7 @@ public class TransportActionModule extends AbstractModule {
bind(TransportSearchScrollAction.class).asEagerSingleton();
bind(TransportMoreLikeThisAction.class).asEagerSingleton();
bind(TransportPercolateAction.class).asEagerSingleton();
}
}

View File

@ -44,6 +44,8 @@ public class TransportActions {
public static final String MORE_LIKE_THIS = "indices/moreLikeThis";
public static final String PERCOLATE = "indices/percolate";
public static class Admin {
public static class Indices {

View File

@ -0,0 +1,182 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.percolate;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy
*/
public class PercolateRequest extends SingleCustomOperationRequest {
private String index;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private boolean sourceUnsafe;
PercolateRequest() {
}
/**
* Constructs a new percolate request.
*
* @param index The index name
*/
public PercolateRequest(String index) {
this.index = index;
}
public PercolateRequest index(String index) {
this.index = index;
return this;
}
public String index() {
return this.index;
}
/**
* Before we fork on a local thread, make sure we copy over the bytes if they are unsafe
*/
@Override public void beforeLocalFork() {
source();
}
public byte[] source() {
if (sourceUnsafe || sourceOffset > 0) {
source = Arrays.copyOfRange(source, sourceOffset, sourceOffset + sourceLength);
sourceOffset = 0;
sourceUnsafe = false;
}
return source;
}
public byte[] unsafeSource() {
return this.source;
}
public int unsafeSourceOffset() {
return this.sourceOffset;
}
public int unsafeSourceLength() {
return this.sourceLength;
}
@Required public PercolateRequest source(Map source) throws ElasticSearchGenerationException {
return source(source, XContentType.SMILE);
}
@Required public PercolateRequest source(Map source, XContentType contentType) throws ElasticSearchGenerationException {
try {
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.map(source);
return source(builder);
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e);
}
}
@Required public PercolateRequest source(String source) {
UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(source);
this.source = result.result;
this.sourceOffset = 0;
this.sourceLength = result.length;
this.sourceUnsafe = true;
return this;
}
@Required public PercolateRequest source(XContentBuilder sourceBuilder) {
try {
source = sourceBuilder.unsafeBytes();
sourceOffset = 0;
sourceLength = sourceBuilder.unsafeBytesLength();
sourceUnsafe = true;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + sourceBuilder + "]", e);
}
return this;
}
public PercolateRequest source(byte[] source) {
return source(source, 0, source.length);
}
@Required public PercolateRequest source(byte[] source, int offset, int length) {
return source(source, offset, length, false);
}
@Required public PercolateRequest source(byte[] source, int offset, int length, boolean unsafe) {
this.source = source;
this.sourceOffset = offset;
this.sourceLength = length;
this.sourceUnsafe = unsafe;
return this;
}
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (index == null) {
validationException = addValidationError("index is missing", validationException);
}
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readUTF();
sourceUnsafe = false;
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(index);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.percolate;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class PercolateResponse implements ActionResponse, Iterable<String> {
private List<String> matches;
PercolateResponse() {
}
public PercolateResponse(List<String> matches) {
this.matches = matches;
}
public List<String> matches() {
return this.matches;
}
@Override public Iterator<String> iterator() {
return matches.iterator();
}
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
matches = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
matches.add(in.readUTF());
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(matches.size());
for (String match : matches) {
out.writeUTF(match);
}
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.percolate;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* @author kimchy (shay.banon)
*/
public class TransportPercolateAction extends TransportSingleCustomOperationAction<PercolateRequest, PercolateResponse> {
private final IndicesService indicesService;
@Inject public TransportPercolateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
}
@Override protected PercolateRequest newRequest() {
return new PercolateRequest();
}
@Override protected PercolateResponse newResponse() {
return new PercolateResponse();
}
@Override protected String transportAction() {
return TransportActions.PERCOLATE;
}
@Override protected String transportShardAction() {
return "indices/percolate/shard";
}
@Override protected ShardsIterator shards(ClusterState clusterState, PercolateRequest request) {
request.index(clusterState.metaData().concreteIndex(request.index()));
return clusterState.routingTable().index(request.index()).allShardsIt();
}
@Override protected PercolateResponse shardOperation(PercolateRequest request, int shardId) throws ElasticSearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
PercolatorService percolatorService = indexService.percolateService();
PercolatorExecutor.Response percolate = percolatorService.percolate(new PercolatorExecutor.Request(request.source()));
return new PercolateResponse(percolate.matches());
}
}

View File

@ -68,6 +68,10 @@ public abstract class SingleCustomOperationRequest implements ActionRequest {
return this;
}
public void beforeLocalFork() {
}
@Override public void readFrom(StreamInput in) throws IOException {
// no need to pass threading over the network, they are always false when coming throw a thread pool
}

View File

@ -120,6 +120,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
final ShardRouting shard = shardsIt.nextActive();
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
@Override public void run() {
try {

View File

@ -34,6 +34,8 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.mlt.MoreLikeThisRequest;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
@ -43,6 +45,7 @@ import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.client.action.get.GetRequestBuilder;
import org.elasticsearch.client.action.index.IndexRequestBuilder;
import org.elasticsearch.client.action.percolate.PercolateRequestBuilder;
import org.elasticsearch.client.action.search.SearchRequestBuilder;
import org.elasticsearch.client.action.search.SearchScrollRequestBuilder;
@ -314,4 +317,19 @@ public interface Client {
* @param listener A listener to be notified of the result
*/
void moreLikeThis(MoreLikeThisRequest request, ActionListener<SearchResponse> listener);
/**
* Percolates a request returning the matches documents.
*/
ActionFuture<PercolateResponse> percolate(PercolateRequest request);
/**
* Percolates a request returning the matches documents.
*/
void percolate(PercolateRequest request, ActionListener<PercolateResponse> listener);
/**
* Percolates a request returning the matches documents.
*/
PercolateRequestBuilder preparePercolate(String index);
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.action.percolate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.support.BaseRequestBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import javax.annotation.Nullable;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class PercolateRequestBuilder extends BaseRequestBuilder<PercolateRequest, PercolateResponse> {
public PercolateRequestBuilder(Client client, @Nullable String index) {
super(client, new PercolateRequest(index));
}
/**
* Sets the index to index the document to.
*/
public PercolateRequestBuilder setIndex(String index) {
request.index(index);
return this;
}
/**
* Index the Map as a JSON.
*
* @param source The map to index
*/
public PercolateRequestBuilder setSource(Map<String, Object> source) {
request.source(source);
return this;
}
/**
* Index the Map as the provided content type.
*
* @param source The map to index
*/
public PercolateRequestBuilder setSource(Map<String, Object> source, XContentType contentType) {
request.source(source, contentType);
return this;
}
/**
* Sets the document source to index.
*
* <p>Note, its preferable to either set it using {@link #setSource(org.elasticsearch.common.xcontent.XContentBuilder)}
* or using the {@link #setSource(byte[])}.
*/
public PercolateRequestBuilder setSource(String source) {
request.source(source);
return this;
}
/**
* Sets the content source to index.
*/
public PercolateRequestBuilder setSource(XContentBuilder sourceBuilder) {
request.source(sourceBuilder);
return this;
}
/**
* Sets the document to index in bytes form.
*/
public PercolateRequestBuilder setSource(byte[] source) {
request.source(source);
return this;
}
/**
* Sets the document to index in bytes form (assumed to be safe to be used from different
* threads).
*
* @param source The source to index
* @param offset The offset in the byte array
* @param length The length of the data
*/
public PercolateRequestBuilder setSource(byte[] source, int offset, int length) {
request.source(source, offset, length);
return this;
}
/**
* Sets the document to index in bytes form.
*
* @param source The source to index
* @param offset The offset in the byte array
* @param length The length of the data
* @param unsafe Is the byte array safe to be used form a different thread
*/
public PercolateRequestBuilder setSource(byte[] source, int offset, int length, boolean unsafe) {
request.source(source, offset, length, unsafe);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/
public PercolateRequestBuilder setListenerThreaded(boolean listenerThreaded) {
request.listenerThreaded(listenerThreaded);
return this;
}
/**
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
* to <tt>true</tt> when running in embedded mode.
*/
public PercolateRequestBuilder setOperationThreaded(boolean operationThreaded) {
request.operationThreaded(operationThreaded);
return this;
}
@Override protected void doExecute(ActionListener<PercolateResponse> listener) {
client.percolate(request, listener);
}
}

View File

@ -41,6 +41,9 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.mlt.MoreLikeThisRequest;
import org.elasticsearch.action.mlt.TransportMoreLikeThisAction;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.TransportPercolateAction;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.internal.InternalClient;
@ -76,11 +79,13 @@ public class NodeClient extends AbstractClient implements InternalClient {
private final TransportMoreLikeThisAction moreLikeThisAction;
private final TransportPercolateAction percolateAction;
@Inject public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportBulkAction bulkAction,
TransportDeleteByQueryAction deleteByQueryAction, TransportGetAction getAction, TransportCountAction countAction,
TransportSearchAction searchAction, TransportSearchScrollAction searchScrollAction,
TransportMoreLikeThisAction moreLikeThisAction) {
TransportMoreLikeThisAction moreLikeThisAction, TransportPercolateAction percolateAction) {
this.threadPool = threadPool;
this.admin = admin;
this.indexAction = indexAction;
@ -92,6 +97,7 @@ public class NodeClient extends AbstractClient implements InternalClient {
this.searchAction = searchAction;
this.searchScrollAction = searchScrollAction;
this.moreLikeThisAction = moreLikeThisAction;
this.percolateAction = percolateAction;
}
@Override public ThreadPool threadPool() {
@ -177,4 +183,12 @@ public class NodeClient extends AbstractClient implements InternalClient {
@Override public void moreLikeThis(MoreLikeThisRequest request, ActionListener<SearchResponse> listener) {
moreLikeThisAction.execute(request, listener);
}
@Override public ActionFuture<PercolateResponse> percolate(PercolateRequest request) {
return percolateAction.execute(request);
}
@Override public void percolate(PercolateRequest request, ActionListener<PercolateResponse> listener) {
percolateAction.execute(request, listener);
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.client.action.get.GetRequestBuilder;
import org.elasticsearch.client.action.index.IndexRequestBuilder;
import org.elasticsearch.client.action.percolate.PercolateRequestBuilder;
import org.elasticsearch.client.action.search.SearchRequestBuilder;
import org.elasticsearch.client.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.internal.InternalClient;
@ -83,4 +84,8 @@ public abstract class AbstractClient implements InternalClient {
@Override public CountRequestBuilder prepareCount(String... indices) {
return new CountRequestBuilder(this).setIndices(indices);
}
@Override public PercolateRequestBuilder preparePercolate(String index) {
return new PercolateRequestBuilder(this, index);
}
}

View File

@ -35,6 +35,8 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.mlt.MoreLikeThisRequest;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
@ -313,4 +315,12 @@ public class TransportClient extends AbstractClient {
@Override public void moreLikeThis(MoreLikeThisRequest request, ActionListener<SearchResponse> listener) {
internalClient.moreLikeThis(request, listener);
}
@Override public ActionFuture<PercolateResponse> percolate(PercolateRequest request) {
return internalClient.percolate(request);
}
@Override public void percolate(PercolateRequest request, ActionListener<PercolateResponse> listener) {
internalClient.percolate(request, listener);
}
}

View File

@ -51,6 +51,7 @@ import org.elasticsearch.client.transport.action.delete.ClientTransportDeleteAct
import org.elasticsearch.client.transport.action.deletebyquery.ClientTransportDeleteByQueryAction;
import org.elasticsearch.client.transport.action.get.ClientTransportGetAction;
import org.elasticsearch.client.transport.action.index.ClientTransportIndexAction;
import org.elasticsearch.client.transport.action.percolate.ClientTransportPercolateAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction;
import org.elasticsearch.common.inject.AbstractModule;
@ -69,6 +70,7 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportSearchAction.class).asEagerSingleton();
bind(ClientTransportSearchScrollAction.class).asEagerSingleton();
bind(ClientTransportBulkAction.class).asEagerSingleton();
bind(ClientTransportPercolateAction.class).asEagerSingleton();
bind(ClientTransportIndicesStatusAction.class).asEagerSingleton();
bind(ClientTransportRefreshAction.class).asEagerSingleton();

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.percolate;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
/**
* @author kimchy (Shay Banon)
*/
public class ClientTransportPercolateAction extends BaseClientTransportAction<PercolateRequest, PercolateResponse> {
@Inject public ClientTransportPercolateAction(Settings settings, TransportService transportService) {
super(settings, transportService, PercolateResponse.class);
}
@Override protected String action() {
return TransportActions.PERCOLATE;
}
}

View File

@ -35,6 +35,8 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.mlt.MoreLikeThisRequest;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
@ -49,6 +51,7 @@ import org.elasticsearch.client.transport.action.deletebyquery.ClientTransportDe
import org.elasticsearch.client.transport.action.get.ClientTransportGetAction;
import org.elasticsearch.client.transport.action.index.ClientTransportIndexAction;
import org.elasticsearch.client.transport.action.mlt.ClientTransportMoreLikeThisAction;
import org.elasticsearch.client.transport.action.percolate.ClientTransportPercolateAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -85,12 +88,14 @@ public class InternalTransportClient extends AbstractClient implements InternalC
private final ClientTransportMoreLikeThisAction moreLikeThisAction;
private final ClientTransportPercolateAction percolateAction;
@Inject public InternalTransportClient(Settings settings, ThreadPool threadPool,
TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
ClientTransportIndexAction indexAction, ClientTransportDeleteAction deleteAction, ClientTransportBulkAction bulkAction, ClientTransportGetAction getAction,
ClientTransportDeleteByQueryAction deleteByQueryAction, ClientTransportCountAction countAction,
ClientTransportSearchAction searchAction, ClientTransportSearchScrollAction searchScrollAction,
ClientTransportMoreLikeThisAction moreLikeThisAction) {
ClientTransportMoreLikeThisAction moreLikeThisAction, ClientTransportPercolateAction percolateAction) {
this.threadPool = threadPool;
this.nodesService = nodesService;
this.adminClient = adminClient;
@ -104,6 +109,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
this.searchAction = searchAction;
this.searchScrollAction = searchScrollAction;
this.moreLikeThisAction = moreLikeThisAction;
this.percolateAction = percolateAction;
}
@Override public void close() {
@ -270,4 +276,21 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}
});
}
@Override public ActionFuture<PercolateResponse> percolate(final PercolateRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<PercolateResponse>>() {
@Override public ActionFuture<PercolateResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
return percolateAction.execute(node, request);
}
});
}
@Override public void percolate(final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
percolateAction.execute(node, request, listener);
return null;
}
});
}
}

View File

@ -83,6 +83,8 @@ public class IndexMetaData {
public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
private final String index;
private final State state;

View File

@ -49,6 +49,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
@ -163,18 +164,29 @@ public class MetaDataCreateIndexService extends AbstractComponent {
// now, put the request settings, so they override templates
indexSettingsBuilder.put(request.settings);
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
if (request.index.equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
if (request.index.equals(PercolatorService.INDEX_NAME)) {
// if its percolator, always 1 shard
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, 1);
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
if (request.index.equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
}
}
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
if (request.index.equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
if (request.index.equals(PercolatorService.INDEX_NAME)) {
// if its percolator, always set number of replicas to 0, and expand to 0-all
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, 0);
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all");
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
if (request.index.equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
}
}
Settings actualIndexSettings = indexSettingsBuilder.build();
@ -320,7 +332,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (request.index.contains("#")) {
throw new InvalidIndexNameException(new Index(request.index), request.index, "must not contain '#");
}
if (!request.index.equals(riverIndexName) && request.index.charAt(0) == '_') {
if (!request.index.equals(riverIndexName) && !request.index.equals(PercolatorService.INDEX_NAME) && request.index.charAt(0) == '_') {
throw new InvalidIndexNameException(new Index(request.index), request.index, "must not start with '_'");
}
if (!request.index.toLowerCase().equals(request.index)) {

View File

@ -49,11 +49,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
if (!event.state().nodes().localNodeMaster()) {
return;
}
if (!event.nodesChanged()) {
return;
}
// TODO we only need to do that on first create of an index, or the number of nodes changed
for (final IndexMetaData indexMetaData : event.state().metaData()) {
String autoExpandReplicas = indexMetaData.settings().get("index.auto_expand_replicas");
String autoExpandReplicas = indexMetaData.settings().get(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS);
if (autoExpandReplicas != null) {
try {
final int numberOfReplicas = event.state().nodes().dataNodes().size() - 1;
@ -67,6 +65,11 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
max = Integer.parseInt(sMax);
}
// same value, nothing to do there
if (numberOfReplicas == indexMetaData.numberOfReplicas()) {
continue;
}
if (numberOfReplicas >= min && numberOfReplicas <= max) {
Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
updateSettings(settings, new String[]{indexMetaData.index()}, new Listener() {

View File

@ -108,6 +108,18 @@ public final class XContentBuilder {
return this;
}
public XContentBuilder field(String name, ToXContent xContent) throws IOException {
field(name);
xContent.toXContent(this, ToXContent.EMPTY_PARAMS);
return this;
}
public XContentBuilder field(String name, ToXContent xContent, ToXContent.Params params) throws IOException {
field(name);
xContent.toXContent(this, params);
return this;
}
public XContentBuilder startObject(String name) throws IOException {
field(name);
startObject();
@ -588,6 +600,8 @@ public final class XContentBuilder {
field(name, (float[]) value);
} else if (value instanceof double[]) {
field(name, (double[]) value);
} else if (value instanceof ToXContent) {
field(name, (ToXContent) value);
} else {
field(name, value.toString());
}

View File

@ -26,5 +26,7 @@ import org.apache.lucene.document.Document;
*/
public interface IdFieldMapper extends FieldMapper<String>, InternalMapper {
public static final String NAME = "_id";
String value(Document document);
}

View File

@ -30,6 +30,10 @@ public class SourceToParse {
return new SourceToParse(source);
}
public static SourceToParse source(XContentParser parser) {
return new SourceToParse(parser);
}
private final byte[] source;
private final XContentParser parser;

View File

@ -392,10 +392,12 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
context.reset(parser, new Document(), type, source.source(), source.flyweight(), listener);
// will result in START_OBJECT
int countDownTokens = 0;
XContentParser.Token token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new MapperException("Malformed content, must start with an object");
}
countDownTokens++;
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new MapperException("Malformed content, after first object, either the type field or the actual properties should exist");
@ -407,6 +409,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
// Note, in this case, we only handle plain value types, an object type will be analyzed as if it was the type itself
// and other same level fields will be ignored
token = parser.nextToken();
countDownTokens++;
// commented out, allow for same type with START_OBJECT, we do our best to handle it except for the above corner case
// if (token != XContentParser.Token.START_OBJECT) {
// throw new MapperException("Malformed content, a field with the same name as the type must be an object with the properties/fields within it");
@ -431,6 +434,10 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
rootObjectMapper.parse(context);
for (int i = 0; i < countDownTokens; i++) {
parser.nextToken();
}
// if we did not get the id, we need to parse the uid into the document now, after it was added
if (source.id() == null) {
if (context.id() == null) {

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.percolator;
import org.elasticsearch.index.Index;
/**
* @author kimchy (shay.banon)
*/
public class PercolateIndexUnavailable extends PercolatorException {
public PercolateIndexUnavailable(Index index) {
super(index, "percolator index not allocated on this node");
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.percolator;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexException;
/**
* @author kimchy (shay.banon)
*/
public class PercolatorException extends IndexException {
public PercolatorException(Index index, String msg) {
super(index, msg);
}
public PercolatorException(Index index, String msg, Throwable cause) {
super(index, msg, cause);
}
}

View File

@ -0,0 +1,330 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.percolator;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.service.IndexShard;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.index.mapper.SourceToParse.*;
/**
* @author kimchy (shay.banon)
*/
public class PercolatorExecutor extends AbstractIndexComponent {
public static class Request {
private final byte[] source;
private final int offset;
private final int length;
public Request(byte[] source) {
this(source, 0, source.length);
}
public Request(byte[] source, int offset, int length) {
this.source = source;
this.offset = offset;
this.length = length;
}
public byte[] source() {
return source;
}
}
public static final class Response {
private final List<String> matches;
private final boolean mappersAdded;
public Response(List<String> matches, boolean mappersAdded) {
this.matches = matches;
this.mappersAdded = mappersAdded;
}
public boolean mappersAdded() {
return this.mappersAdded;
}
public List<String> matches() {
return matches;
}
}
private final MapperService mapperService;
private final IndexQueryParserService queryParserService;
private volatile ImmutableMap<String, Query> queries = ImmutableMap.of();
@Inject public PercolatorExecutor(Index index, @IndexSettings Settings indexSettings,
MapperService mapperService, IndexQueryParserService queryParserService) {
super(index, indexSettings);
this.mapperService = mapperService;
this.queryParserService = queryParserService;
}
public synchronized void close() {
ImmutableMap<String, Query> old = queries;
queries = ImmutableMap.of();
old.clear();
}
public void addQuery(String name, QueryBuilder queryBuilder) throws ElasticSearchException {
try {
XContentBuilder builder = XContentFactory.smileBuilder()
.startObject().field("query", queryBuilder).endObject();
FastByteArrayOutputStream unsafeBytes = builder.unsafeStream();
addQuery(name, unsafeBytes.unsafeByteArray(), 0, unsafeBytes.size());
} catch (IOException e) {
throw new ElasticSearchException("Failed to add query [" + name + "]", e);
}
}
public void addQuery(String name, byte[] source) throws ElasticSearchException {
addQuery(name, source, 0, source.length);
}
public void addQuery(String name, byte[] source, int sourceOffset, int sourceLength) throws ElasticSearchException {
XContentParser parser = null;
try {
parser = XContentFactory.xContent(source, sourceOffset, sourceLength).createParser(source, sourceOffset, sourceLength);
Query query = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("query".equals(currentFieldName)) {
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
query = queryParser.parse(parser).query();
}
}
}
addQuery(name, query);
} catch (IOException e) {
throw new ElasticSearchException("Failed to add query [" + name + "]", e);
} finally {
if (parser != null) {
parser.close();
}
}
}
public synchronized void addQuery(String name, Query query) {
this.queries = MapBuilder.newMapBuilder(queries).put(name, query).immutableMap();
}
public synchronized void removeQuery(String name) {
this.queries = MapBuilder.newMapBuilder(queries).remove(name).immutableMap();
}
public Response percolate(final Request request) throws ElasticSearchException {
return percolate(request, null, null);
}
public Response percolate(final Request request, @Nullable final IndexService percolatorIndex, @Nullable final IndexShard percolatorShard) throws ElasticSearchException {
Query query = null;
ParsedDocument doc = null;
XContentParser parser = null;
try {
parser = XContentFactory.xContent(request.source()).createParser(request.source());
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("query".equals(currentFieldName)) {
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
query = queryParser.parse(parser).query();
} else if ("doc".equals(currentFieldName)) {
// the first level should be the type
token = parser.nextToken();
assert token == XContentParser.Token.FIELD_NAME;
String type = parser.currentName();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
doc = docMapper.parse(source(parser).type(type).flyweight(true));
}
} else if (token == null) {
break;
}
}
} catch (IOException e) {
throw new PercolatorException(index, "failed to parse request", e);
} finally {
if (parser != null) {
parser.close();
}
}
if (doc == null) {
throw new PercolatorException(index, "No doc to percolate in the request");
}
// first, parse the source doc into a MemoryIndex
final MemoryIndex memoryIndex = new MemoryIndex();
for (Fieldable field : doc.doc().getFields()) {
if (!field.isIndexed()) {
continue;
}
TokenStream tokenStream = field.tokenStreamValue();
if (tokenStream != null) {
memoryIndex.addField(field.name(), tokenStream, field.getBoost());
} else {
Reader reader = field.readerValue();
if (reader != null) {
try {
memoryIndex.addField(field.name(), doc.analyzer().reusableTokenStream(field.name(), reader), field.getBoost() * doc.doc().getBoost());
} catch (IOException e) {
throw new MapperParsingException("Failed to analyze field [" + field.name() + "]", e);
}
} else {
String value = field.stringValue();
if (value != null) {
try {
memoryIndex.addField(field.name(), doc.analyzer().reusableTokenStream(field.name(), new FastStringReader(value)), field.getBoost() * doc.doc().getBoost());
} catch (IOException e) {
throw new MapperParsingException("Failed to analyze field [" + field.name() + "]", e);
}
}
}
}
}
final IndexSearcher searcher = memoryIndex.createSearcher();
List<String> matches = new ArrayList<String>();
if (query == null) {
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
for (Map.Entry<String, Query> entry : queries.entrySet()) {
try {
searcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
matches.add(entry.getKey());
}
}
} else {
Engine.Searcher percolatorSearcher = percolatorShard.searcher();
try {
percolatorSearcher.searcher().search(query, new QueryCollector(logger, queries, searcher, percolatorIndex, matches));
} catch (IOException e) {
logger.warn("failed to execute", e);
} finally {
percolatorSearcher.release();
}
}
return new Response(matches, doc.mappersAdded());
}
static class QueryCollector extends Collector {
private final IndexSearcher searcher;
private final IndexService percolatorIndex;
private final List<String> matches;
private final ImmutableMap<String, Query> queries;
private final ESLogger logger;
private final Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
QueryCollector(ESLogger logger, ImmutableMap<String, Query> queries, IndexSearcher searcher, IndexService percolatorIndex, List<String> matches) {
this.logger = logger;
this.queries = queries;
this.searcher = searcher;
this.percolatorIndex = percolatorIndex;
this.matches = matches;
}
private FieldData fieldData;
@Override public void setScorer(Scorer scorer) throws IOException {
}
@Override public void collect(int doc) throws IOException {
String id = fieldData.stringValue(doc);
Query query = queries.get(id);
if (query == null) {
// log???
return;
}
// run the query
try {
searcher.search(query, collector);
if (collector.exists()) {
matches.add(id);
}
} catch (IOException e) {
logger.warn("[" + id + "] failed to execute query", e);
}
}
@Override public void setNextReader(IndexReader reader, int docBase) throws IOException {
fieldData = percolatorIndex.cache().fieldData().cache(FieldDataType.DefaultTypes.STRING, reader, IdFieldMapper.NAME);
}
@Override public boolean acceptsDocsOutOfOrder() {
return true;
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.AbstractModule;
public class PercolatorModule extends AbstractModule {
@Override protected void configure() {
bind(PercolatorExecutor.class).asEagerSingleton();
bind(PercolatorService.class).asEagerSingleton();
}
}

View File

@ -19,207 +19,203 @@
package org.elasticsearch.index.percolator;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.regex.Regex;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.elasticsearch.common.lucene.search.TermFilter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldSelector;
import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.OperationListener;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.index.mapper.SourceToParse.*;
/**
* @author kimchy (shay.banon)
*/
public class PercolatorService extends AbstractIndexComponent {
public static class Request {
private final String type;
private final byte[] source;
public static final String INDEX_NAME = "_percolator";
private String match;
private String unmatch;
private final IndicesService indicesService;
public Request(String type, byte[] source) {
this.type = type;
this.source = source;
}
private final PercolatorExecutor percolator;
public String type() {
return type;
}
private final ShardLifecycleListener shardLifecycleListener;
public byte[] source() {
return source;
}
private final Object mutex = new Object();
public String match() {
return this.match;
}
private boolean initialQueriesFetchDone = false;
public Request match(String match) {
this.match = match;
return this;
}
public String unmatch() {
return this.unmatch;
}
public Request unmatch(String unmatch) {
this.unmatch = unmatch;
return this;
}
}
public static final class Response {
private final List<String> matches;
private final boolean mappersAdded;
public Response(List<String> matches, boolean mappersAdded) {
this.matches = matches;
this.mappersAdded = mappersAdded;
}
public boolean mappersAdded() {
return this.mappersAdded;
}
public List<String> matches() {
return matches;
}
}
private final MapperService mapperService;
private final IndexQueryParserService queryParserService;
private volatile ImmutableMap<String, Query> queries = ImmutableMap.of();
@Inject public PercolatorService(Index index, @IndexSettings Settings indexSettings,
MapperService mapperService, IndexQueryParserService queryParserService) {
@Inject public PercolatorService(Index index, @IndexSettings Settings indexSettings, IndicesService indicesService,
PercolatorExecutor percolator) {
super(index, indexSettings);
this.mapperService = mapperService;
this.queryParserService = queryParserService;
this.indicesService = indicesService;
this.percolator = percolator;
this.shardLifecycleListener = new ShardLifecycleListener();
this.indicesService.indicesLifecycle().addListener(shardLifecycleListener);
}
public void addQuery(String name, QueryBuilder queryBuilder) {
addQuery(name, null, queryBuilder);
public void close() {
this.indicesService.indicesLifecycle().removeListener(shardLifecycleListener);
}
public void addQuery(String name, @Nullable String queryParserName, QueryBuilder queryBuilder) {
FastByteArrayOutputStream unsafeBytes = queryBuilder.buildAsUnsafeBytes();
addQuery(name, queryParserName, unsafeBytes.unsafeByteArray(), 0, unsafeBytes.size());
public PercolatorExecutor.Response percolate(PercolatorExecutor.Request request) throws PercolatorException {
IndexService percolatorIndex = indicesService.indexService(INDEX_NAME);
if (percolatorIndex == null) {
throw new PercolateIndexUnavailable(new Index(INDEX_NAME));
}
if (percolatorIndex.numberOfShards() == 0) {
throw new PercolateIndexUnavailable(new Index(INDEX_NAME));
}
IndexShard percolatorShard = percolatorIndex.shard(0);
return percolator.percolate(request, percolatorIndex, percolatorShard);
}
public void addQuery(String name, @Nullable String queryParserName,
byte[] querySource, int querySourceOffset, int querySourceLength) throws ElasticSearchException {
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
if (queryParserName != null) {
queryParser = queryParserService.indexQueryParser(queryParserName);
if (queryParser == null) {
throw new IndexQueryParserMissingException(queryParserName);
}
private void loadQueries(String indexName) {
IndexService indexService = percolatorIndexService();
IndexShard shard = indexService.shard(0);
Engine.Searcher searcher = shard.searcher();
try {
// create a query to fetch all queries that are registered under the index name (which is the type
// in the percolator).
Query query = new DeletionAwareConstantScoreQuery(indexQueriesFilter(indexName));
searcher.searcher().search(query, new QueriesLoaderCollector());
} catch (IOException e) {
throw new PercolatorException(index, "failed to load queries from percolator index");
} finally {
searcher.release();
}
}
private Filter indexQueriesFilter(String indexName) {
return percolatorIndexService().cache().filter().cache(new TermFilter(new Term(TypeFieldMapper.NAME, indexName)));
}
private boolean percolatorAllocated() {
if (!indicesService.hasIndex(INDEX_NAME)) {
return false;
}
if (percolatorIndexService().numberOfShards() == 0) {
return false;
}
if (percolatorIndexService().shard(0).state() != IndexShardState.STARTED) {
return false;
}
return true;
}
private IndexService percolatorIndexService() {
return indicesService.indexService(INDEX_NAME);
}
class QueriesLoaderCollector extends Collector {
private FieldData fieldData;
private IndexReader reader;
@Override public void setScorer(Scorer scorer) throws IOException {
}
Query query = queryParser.parse(querySource, querySourceOffset, querySourceLength).query();
addQuery(name, query);
}
public synchronized void addQuery(String name, Query query) {
this.queries = MapBuilder.newMapBuilder(queries).put(name, query).immutableMap();
}
public synchronized void removeQuery(String name) {
this.queries = MapBuilder.newMapBuilder(queries).remove(name).immutableMap();
}
public Response percolate(Request request) throws ElasticSearchException {
// first, parse the source doc into a MemoryIndex
final MemoryIndex memoryIndex = new MemoryIndex();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.type());
ParsedDocument doc = docMapper.parse(source(request.source()).type(request.type()).flyweight(true));
for (Fieldable field : doc.doc().getFields()) {
if (!field.isIndexed()) {
continue;
}
TokenStream tokenStream = field.tokenStreamValue();
if (tokenStream != null) {
memoryIndex.addField(field.name(), tokenStream, field.getBoost());
} else {
Reader reader = field.readerValue();
if (reader != null) {
try {
memoryIndex.addField(field.name(), doc.analyzer().reusableTokenStream(field.name(), reader), field.getBoost() * doc.doc().getBoost());
} catch (IOException e) {
throw new MapperParsingException("Failed to analyze field [" + field.name() + "]", e);
}
} else {
String value = field.stringValue();
if (value != null) {
try {
memoryIndex.addField(field.name(), doc.analyzer().reusableTokenStream(field.name(), new FastStringReader(value)), field.getBoost() * doc.doc().getBoost());
} catch (IOException e) {
throw new MapperParsingException("Failed to analyze field [" + field.name() + "]", e);
}
}
}
}
}
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
List<String> matches = new ArrayList<String>();
IndexSearcher searcher = memoryIndex.createSearcher();
for (Map.Entry<String, Query> entry : queries.entrySet()) {
if (request.match() != null) {
if (!Regex.simpleMatch(request.match(), entry.getKey())) {
continue;
}
}
if (request.unmatch() != null) {
if (Regex.simpleMatch(request.unmatch(), entry.getKey())) {
continue;
}
}
@Override public void collect(int doc) throws IOException {
String id = fieldData.stringValue(doc);
// the _source is the query
Document document = reader.document(doc, SourceFieldSelector.INSTANCE);
byte[] source = document.getBinaryValue(SourceFieldMapper.NAME);
try {
searcher.search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
}
if (collector.exists()) {
matches.add(entry.getKey());
percolator.addQuery(id, source, 0, source.length);
} catch (Exception e) {
logger.warn("failed to add query [{}]", e, id);
}
}
return new Response(matches, doc.mappersAdded());
@Override public void setNextReader(IndexReader reader, int docBase) throws IOException {
this.reader = reader;
fieldData = percolatorIndexService().cache().fieldData().cache(FieldDataType.DefaultTypes.STRING, reader, IdFieldMapper.NAME);
}
@Override public boolean acceptsDocsOutOfOrder() {
return true;
}
}
class ShardLifecycleListener extends IndicesLifecycle.Listener {
@Override public void afterIndexShardCreated(IndexShard indexShard) {
if (indexShard.shardId().index().name().equals(INDEX_NAME)) {
indexShard.addListener(new RealTimePercolatorOperationListener());
}
}
@Override public void afterIndexShardStarted(IndexShard indexShard) {
if (indexShard.shardId().index().name().equals(INDEX_NAME)) {
// percolator index has started, fetch what we can from it and initialize the indices
// we have
synchronized (mutex) {
if (initialQueriesFetchDone) {
return;
}
// we load the queries for all existing indices
for (IndexService indexService : indicesService) {
loadQueries(indexService.index().name());
}
initialQueriesFetchDone = true;
}
}
if (!indexShard.shardId().index().equals(index())) {
// not our index, bail
return;
}
if (!percolatorAllocated()) {
return;
}
// we are only interested when the first shard on this node has been created for an index
// when it does, fetch the relevant queries if not fetched already
if (indicesService.indexService(indexShard.shardId().index().name()).numberOfShards() != 1) {
return;
}
synchronized (mutex) {
if (initialQueriesFetchDone) {
return;
}
// we load queries for this index
loadQueries(index.name());
initialQueriesFetchDone = true;
}
}
}
class RealTimePercolatorOperationListener extends OperationListener {
@Override public Engine.Create beforeCreate(Engine.Create create) {
percolator.addQuery(create.id(), create.source());
return create;
}
@Override public Engine.Index beforeIndex(Engine.Index index) {
percolator.addQuery(index.id(), index.source());
return index;
}
@Override public Engine.Delete beforeDelete(Engine.Delete delete) {
percolator.removeQuery(delete.id());
return delete;
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexComponent;
/**
@ -36,4 +37,6 @@ public interface IndexQueryParser extends IndexComponent {
ParsedQuery parse(String source) throws ElasticSearchException;
ParsedQuery parse(QueryBuilder queryBuilder) throws ElasticSearchException;
ParsedQuery parse(XContentParser parser) throws ElasticSearchException;
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
@ -45,6 +46,8 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard> {
IndexCache cache();
PercolatorService percolateService();
AnalysisService analysisService();
MapperService mapperService();

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardManagement;
@ -91,6 +92,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final InternalIndicesLifecycle indicesLifecycle;
private final PercolatorService percolatorService;
private final AnalysisService analysisService;
private final MapperService mapperService;
@ -114,13 +117,14 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final CleanCacheOnIndicesLifecycleListener cleanCacheOnIndicesLifecycleListener = new CleanCacheOnIndicesLifecycleListener();
@Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService,
PercolatorService percolatorService, AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService,
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore) {
super(index, indexSettings);
this.injector = injector;
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.indexSettings = indexSettings;
this.percolatorService = percolatorService;
this.analysisService = analysisService;
this.mapperService = mapperService;
this.queryParserService = queryParserService;
@ -180,6 +184,10 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexCache;
}
@Override public PercolatorService percolateService() {
return this.percolatorService;
}
@Override public AnalysisService analysisService() {
return this.analysisService;
}

View File

@ -38,6 +38,10 @@ import javax.annotation.Nullable;
@ThreadSafe
public interface IndexShard extends IndexShardComponent {
void addListener(OperationListener listener);
void removeListener(OperationListener listener);
ShardRouting routingEntry();
IndexShardState state();

View File

@ -56,6 +56,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.index.mapper.SourceToParse.*;
@ -95,6 +96,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private RecoveryStatus peerRecoveryStatus;
private CopyOnWriteArrayList<OperationListener> listeners = null;
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, Engine engine, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache) {
super(shardId, indexSettings);
@ -112,6 +115,23 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.checkIndex = indexSettings.getAsBoolean("index.shard.check_index", false);
}
@Override public synchronized void addListener(OperationListener listener) {
if (listeners == null) {
listeners = new CopyOnWriteArrayList<OperationListener>();
}
listeners.add(listener);
}
@Override public synchronized void removeListener(OperationListener listener) {
if (listeners == null) {
return;
}
listeners.remove(listener);
if (listeners.isEmpty()) {
listeners = null;
}
}
public Store store() {
return this.store;
}
@ -220,6 +240,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException {
writeAllowed();
if (listeners != null) {
for (OperationListener listener : listeners) {
create = listener.beforeCreate(create);
}
}
if (logger.isTraceEnabled()) {
logger.trace("index {}", create.doc());
}
@ -235,6 +260,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
writeAllowed();
if (listeners != null) {
for (OperationListener listener : listeners) {
index = listener.beforeIndex(index);
}
}
if (logger.isTraceEnabled()) {
logger.trace("index {}", index.doc());
}
@ -249,6 +279,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public void delete(Engine.Delete delete) throws ElasticSearchException {
writeAllowed();
if (listeners != null) {
for (OperationListener listener : listeners) {
delete = listener.beforeDelete(delete);
}
}
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
}
@ -257,6 +292,34 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException {
writeAllowed();
if (listeners != null) {
for (int i = 0; i < bulk.ops().length; i++) {
Engine.Operation op = bulk.ops()[i];
if (op == null) {
continue;
}
switch (op.opType()) {
case CREATE:
Engine.Create create = (Engine.Create) op;
for (OperationListener listener : listeners) {
bulk.ops()[i] = listener.beforeCreate(create);
}
break;
case INDEX:
Engine.Index index = (Engine.Index) op;
for (OperationListener listener : listeners) {
bulk.ops()[i] = listener.beforeIndex(index);
}
break;
case DELETE:
Engine.Delete delete = (Engine.Delete) op;
for (OperationListener listener : listeners) {
bulk.ops()[i] = listener.beforeDelete(delete);
}
break;
}
}
}
if (logger.isTraceEnabled()) {
logger.trace("bulk, items [{}]", bulk.ops().length);
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.service;
import org.elasticsearch.index.engine.Engine;
/**
* @author kimchy (shay.banon)
*/
public abstract class OperationListener {
public Engine.Create beforeCreate(Engine.Create create) {
return create;
}
public Engine.Index beforeIndex(Engine.Index index) {
return index;
}
public Engine.Delete beforeDelete(Engine.Delete delete) {
return delete;
}
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexGatewayModule;
import org.elasticsearch.index.mapper.MapperServiceModule;
import org.elasticsearch.index.percolator.PercolatorModule;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.query.IndexQueryParserModule;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.service.InternalIndexService;
@ -289,6 +290,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
((InternalIndexService) indexService).close(delete, reason);
indexInjector.getInstance(PercolatorService.class).close();
indexInjector.getInstance(IndexCache.class).close();
indexInjector.getInstance(AnalysisService.class).close();
indexInjector.getInstance(IndexEngine.class).close();

View File

@ -56,6 +56,7 @@ import org.elasticsearch.rest.action.get.RestGetAction;
import org.elasticsearch.rest.action.index.RestIndexAction;
import org.elasticsearch.rest.action.main.RestMainAction;
import org.elasticsearch.rest.action.mlt.RestMoreLikeThisAction;
import org.elasticsearch.rest.action.percolate.RestPercolateAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
@ -116,5 +117,7 @@ public class RestActionModule extends AbstractModule {
bind(RestSearchScrollAction.class).asEagerSingleton();
bind(RestMoreLikeThisAction.class).asEagerSingleton();
bind(RestPercolateAction.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.percolate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import javax.inject.Inject;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
/**
* @author kimchy (shay.banon)
*/
public class RestPercolateAction extends BaseRestHandler {
@Inject public RestPercolateAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(GET, "/{index}/_percolate", this);
controller.registerHandler(POST, "/{index}/_percolate", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
PercolateRequest percolateRequest = new PercolateRequest(request.param("index"));
percolateRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
// we just send a response, no need to fork
percolateRequest.listenerThreaded(false);
// we don't spawn, then fork if local
percolateRequest.operationThreaded(true);
client.percolate(percolateRequest, new ActionListener<PercolateResponse>() {
@Override public void onResponse(PercolateResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field(Fields.OK, true);
builder.startArray(Fields.MATCHES);
for (String match : response) {
builder.value(match);
}
builder.endArray();
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
static final class Fields {
static final XContentBuilderString OK = new XContentBuilderString("ok");
static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.percolator;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -47,9 +48,9 @@ import static org.hamcrest.Matchers.*;
* @author kimchy (shay.banon)
*/
@Test
public class SimplePercolatorTests {
public class PercolatorExecutorTests {
private PercolatorService percolatorService;
private PercolatorExecutor percolatorExecutor;
@BeforeTest public void buildPercolatorService() {
Settings settings = ImmutableSettings.settingsBuilder()
@ -67,46 +68,43 @@ public class SimplePercolatorTests {
new SimilarityModule(settings),
new IndexQueryParserModule(settings),
new IndexNameModule(index),
new PercolatorModule()
new AbstractModule() {
@Override protected void configure() {
bind(PercolatorExecutor.class).asEagerSingleton();
}
}
).createInjector();
percolatorService = injector.getInstance(PercolatorService.class);
percolatorExecutor = injector.getInstance(PercolatorExecutor.class);
}
@Test public void testSimplePercolator() throws Exception {
// introduce the doc
XContentBuilder doc = XContentFactory.jsonBuilder().startObject()
XContentBuilder doc = XContentFactory.jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("field1", 1)
.field("field2", "value")
.endObject();
.endObject().endObject().endObject();
byte[] source = doc.copiedBytes();
PercolatorService.Response percolate = percolatorService.percolate(new PercolatorService.Request("type1", source));
PercolatorExecutor.Response percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
assertThat(percolate.matches(), hasSize(0));
// add a query
percolatorService.addQuery("test1", termQuery("field2", "value"));
percolatorExecutor.addQuery("test1", termQuery("field2", "value"));
percolate = percolatorService.percolate(new PercolatorService.Request("type1", source));
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
assertThat(percolate.matches(), hasSize(1));
assertThat(percolate.matches(), hasItem("test1"));
percolatorService.addQuery("test2", termQuery("field1", 1));
percolatorExecutor.addQuery("test2", termQuery("field1", 1));
percolate = percolatorService.percolate(new PercolatorService.Request("type1", source));
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
assertThat(percolate.matches(), hasSize(2));
assertThat(percolate.matches(), hasItems("test1", "test2"));
percolate = percolatorService.percolate(new PercolatorService.Request("type1", source).match("*2"));
assertThat(percolate.matches(), hasSize(1));
assertThat(percolate.matches(), hasItems("test2"));
percolate = percolatorService.percolate(new PercolatorService.Request("type1", source).match("*").unmatch("*1"));
assertThat(percolate.matches(), hasSize(1));
assertThat(percolate.matches(), hasItems("test2"));
percolatorService.removeQuery("test2");
percolate = percolatorService.percolate(new PercolatorService.Request("type1", source));
percolatorExecutor.removeQuery("test2");
percolate = percolatorExecutor.percolate(new PercolatorExecutor.Request(source));
assertThat(percolate.matches(), hasSize(1));
assertThat(percolate.matches(), hasItems("test1"));
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.percolator;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class SimplePercolatorTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test public void registerPercolatorAndThenCreateAnIndex() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
client.admin().indices().prepareDelete("_percolator").execute().actionGet();
} catch (Exception e) {
// ignore
}
logger.info("--> register a query");
client.prepareIndex("_percolator", "test", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
.endObject())
.execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(2).execute().actionGet();
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
PercolateResponse percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("field1", "value1")
.endObject().endObject().endObject())
.execute().actionGet();
assertThat(percolate.matches().size(), equalTo(1));
}
@Test public void createIndexAndThenRegisterPercolator() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
client.admin().indices().prepareDelete("_percolator").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
logger.info("--> register a query");
client.prepareIndex("_percolator", "test", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
.endObject())
.execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
PercolateResponse percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("field1", "value1")
.endObject().endObject().endObject())
.execute().actionGet();
assertThat(percolate.matches().size(), equalTo(1));
}
@Test public void dynamicAddingRemovingQueries() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
client.admin().indices().prepareDelete("_percolator").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
logger.info("--> register a query 1");
client.prepareIndex("_percolator", "test", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
.endObject())
.setRefresh(true)
.execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
PercolateResponse percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("field1", "value1")
.endObject().endObject().endObject())
.execute().actionGet();
assertThat(percolate.matches().size(), equalTo(1));
assertThat(percolate.matches(), hasItem("kuku"));
logger.info("--> register a query 2");
client.prepareIndex("_percolator", "test", "bubu")
.setSource(jsonBuilder().startObject()
.field("color", "green")
.field("query", termQuery("field1", "value2"))
.endObject())
.setRefresh(true)
.execute().actionGet();
percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("field1", "value2")
.endObject().endObject().endObject())
.execute().actionGet();
assertThat(percolate.matches().size(), equalTo(1));
assertThat(percolate.matches(), hasItem("bubu"));
logger.info("--> register a query 3");
client.prepareIndex("_percolator", "test", "susu")
.setSource(jsonBuilder().startObject()
.field("color", "red")
.field("query", termQuery("field1", "value2"))
.endObject())
.setRefresh(true)
.execute().actionGet();
percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject()
.startObject("doc").startObject("type1")
.field("field1", "value2")
.endObject().endObject()
.field("query", termQuery("color", "red"))
.endObject())
.execute().actionGet();
assertThat(percolate.matches().size(), equalTo(1));
assertThat(percolate.matches(), hasItem("susu"));
logger.info("--> deleting query 1");
client.prepareDelete("_percolator", "test", "kuku").setRefresh(true).execute().actionGet();
percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
.field("field1", "value1")
.endObject().endObject().endObject())
.execute().actionGet();
assertThat(percolate.matches().size(), equalTo(0));
}
}