mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Analyze API: An API to analyzer custom text based on an optional analyzer, closes #529.
This commit is contained in:
parent
8689e5cf16
commit
b4113d57d3
@ -31,6 +31,7 @@ import org.elasticsearch.action.admin.cluster.ping.replication.TransportShardRep
|
||||
import org.elasticsearch.action.admin.cluster.ping.single.TransportSinglePingAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
|
||||
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
|
||||
import org.elasticsearch.action.admin.indices.analyze.TransportAnalyzeAction;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.TransportClearIndicesCacheAction;
|
||||
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||
@ -90,6 +91,7 @@ public class TransportActionModule extends AbstractModule {
|
||||
bind(TransportDeleteMappingAction.class).asEagerSingleton();
|
||||
bind(TransportIndicesAliasesAction.class).asEagerSingleton();
|
||||
bind(TransportUpdateSettingsAction.class).asEagerSingleton();
|
||||
bind(TransportAnalyzeAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportGatewaySnapshotAction.class).asEagerSingleton();
|
||||
|
||||
|
@ -57,6 +57,7 @@ public class TransportActions {
|
||||
public static final String STATUS = "indices/status";
|
||||
public static final String ALIASES = "indices/aliases";
|
||||
public static final String UPDATE_SETTINGS = "indices/updateSettings";
|
||||
public static final String ANALYZE = "indices/analyze";
|
||||
|
||||
public static class Gateway {
|
||||
public static final String SNAPSHOT = "indices/gateway/snapshot";
|
||||
|
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.analyze;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.Actions.*;
|
||||
|
||||
/**
|
||||
* A request to analyze a text associated with a specific index. Allow to provide
|
||||
* the actual analyzer name to perform the analysis with.
|
||||
*
|
||||
* @author kimchy
|
||||
*/
|
||||
public class AnalyzeRequest extends SingleCustomOperationRequest {
|
||||
|
||||
private String index;
|
||||
|
||||
private String text;
|
||||
|
||||
private String analyzer;
|
||||
|
||||
AnalyzeRequest() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new analyzer request for the provided index and text.
|
||||
*
|
||||
* @param index The index name
|
||||
* @param text The text to analyze
|
||||
*/
|
||||
public AnalyzeRequest(String index, String text) {
|
||||
this.index = index;
|
||||
this.text = text;
|
||||
}
|
||||
|
||||
public String text() {
|
||||
return this.text;
|
||||
}
|
||||
|
||||
public AnalyzeRequest index(String index) {
|
||||
this.index = index;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String index() {
|
||||
return this.index;
|
||||
}
|
||||
|
||||
public AnalyzeRequest analyzer(String analyzer) {
|
||||
this.analyzer = analyzer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String analyzer() {
|
||||
return this.analyzer;
|
||||
}
|
||||
|
||||
@Override public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = super.validate();
|
||||
if (index == null) {
|
||||
validationException = addValidationError("index is missing", validationException);
|
||||
}
|
||||
if (text == null) {
|
||||
validationException = addValidationError("text is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readUTF();
|
||||
text = in.readUTF();
|
||||
if (in.readBoolean()) {
|
||||
analyzer = in.readUTF();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeUTF(index);
|
||||
out.writeUTF(text);
|
||||
if (analyzer == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeUTF(analyzer);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,196 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.analyze;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class AnalyzeResponse implements ActionResponse, Iterable<AnalyzeResponse.AnalyzeToken>, ToXContent {
|
||||
|
||||
public static class AnalyzeToken implements Streamable {
|
||||
private String term;
|
||||
private int startOffset;
|
||||
private int endOffset;
|
||||
private int position;
|
||||
private String type;
|
||||
|
||||
AnalyzeToken() {
|
||||
}
|
||||
|
||||
public AnalyzeToken(String term, int position, int startOffset, int endOffset, String type) {
|
||||
this.term = term;
|
||||
this.position = position;
|
||||
this.startOffset = startOffset;
|
||||
this.endOffset = endOffset;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String term() {
|
||||
return this.term;
|
||||
}
|
||||
|
||||
public String getTerm() {
|
||||
return term();
|
||||
}
|
||||
|
||||
public int startOffset() {
|
||||
return this.startOffset;
|
||||
}
|
||||
|
||||
public int getStartOffset() {
|
||||
return startOffset();
|
||||
}
|
||||
|
||||
public int endOffset() {
|
||||
return this.endOffset;
|
||||
}
|
||||
|
||||
public int getEndOffset() {
|
||||
return endOffset();
|
||||
}
|
||||
|
||||
public int position() {
|
||||
return this.position;
|
||||
}
|
||||
|
||||
public int getPosition() {
|
||||
return position();
|
||||
}
|
||||
|
||||
public String type() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public static AnalyzeToken readAnalyzeToken(StreamInput in) throws IOException {
|
||||
AnalyzeToken analyzeToken = new AnalyzeToken();
|
||||
analyzeToken.readFrom(in);
|
||||
return analyzeToken;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
term = in.readUTF();
|
||||
startOffset = in.readInt();
|
||||
endOffset = in.readInt();
|
||||
position = in.readVInt();
|
||||
if (in.readBoolean()) {
|
||||
type = in.readUTF();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeUTF(term);
|
||||
out.writeInt(startOffset);
|
||||
out.writeInt(endOffset);
|
||||
out.writeVInt(position);
|
||||
if (type == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
out.writeUTF(type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<AnalyzeToken> tokens;
|
||||
|
||||
AnalyzeResponse() {
|
||||
}
|
||||
|
||||
public AnalyzeResponse(List<AnalyzeToken> tokens) {
|
||||
this.tokens = tokens;
|
||||
}
|
||||
|
||||
public List<AnalyzeToken> tokens() {
|
||||
return this.tokens;
|
||||
}
|
||||
|
||||
public List<AnalyzeToken> getTokens() {
|
||||
return tokens();
|
||||
}
|
||||
|
||||
@Override public Iterator<AnalyzeToken> iterator() {
|
||||
return tokens.iterator();
|
||||
}
|
||||
|
||||
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
String format = params.param("format", "detailed");
|
||||
if ("detailed".equals(format)) {
|
||||
builder.startArray("tokens");
|
||||
for (AnalyzeToken token : tokens) {
|
||||
builder.startObject();
|
||||
builder.field("token", token.term());
|
||||
builder.field("start_offset", token.startOffset());
|
||||
builder.field("end_offset", token.endOffset());
|
||||
builder.field("type", token.type());
|
||||
builder.field("position", token.position());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
} else if ("text".equals(format)) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
int lastPosition = 0;
|
||||
for (AnalyzeToken token : tokens) {
|
||||
if (lastPosition != token.position()) {
|
||||
if (lastPosition != 0) {
|
||||
sb.append("\n").append(token.position()).append(": \n");
|
||||
}
|
||||
lastPosition = token.position();
|
||||
}
|
||||
sb.append('[')
|
||||
.append(token.term()).append(":")
|
||||
.append(token.startOffset()).append("->").append(token.endOffset()).append(":")
|
||||
.append(token.type())
|
||||
.append("]\n");
|
||||
}
|
||||
builder.field("tokens", sb);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
tokens = new ArrayList<AnalyzeToken>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
tokens.add(AnalyzeToken.readAnalyzeToken(in));
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(tokens.size());
|
||||
for (AnalyzeToken token : tokens) {
|
||||
token.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.analyze;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.TermAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.TypeAttribute;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.FastStringReader;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TransportAnalyzeAction extends TransportSingleCustomOperationAction<AnalyzeRequest, AnalyzeResponse> {
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
@Inject public TransportAnalyzeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
|
||||
IndicesService indicesService) {
|
||||
super(settings, threadPool, clusterService, transportService);
|
||||
this.indicesService = indicesService;
|
||||
}
|
||||
|
||||
@Override protected AnalyzeRequest newRequest() {
|
||||
return new AnalyzeRequest();
|
||||
}
|
||||
|
||||
@Override protected AnalyzeResponse newResponse() {
|
||||
return new AnalyzeResponse();
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return TransportActions.Admin.Indices.ANALYZE;
|
||||
}
|
||||
|
||||
@Override protected String transportShardAction() {
|
||||
return "indices/analyze/shard";
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, AnalyzeRequest request) {
|
||||
request.index(clusterState.metaData().concreteIndex(request.index()));
|
||||
return clusterState.routingTable().index(request.index()).allShardsIt();
|
||||
}
|
||||
|
||||
@Override protected AnalyzeResponse shardOperation(AnalyzeRequest request, int shardId) throws ElasticSearchException {
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
Analyzer analyzer = null;
|
||||
String field = "contents";
|
||||
if (request.analyzer() != null) {
|
||||
analyzer = indexService.analysisService().analyzer(request.analyzer());
|
||||
} else {
|
||||
analyzer = indexService.analysisService().defaultIndexAnalyzer();
|
||||
}
|
||||
if (analyzer == null) {
|
||||
throw new ElasticSearchIllegalArgumentException("failed to find analyzer");
|
||||
}
|
||||
|
||||
List<AnalyzeResponse.AnalyzeToken> tokens = Lists.newArrayList();
|
||||
TokenStream stream = null;
|
||||
try {
|
||||
stream = analyzer.reusableTokenStream(field, new FastStringReader(request.text()));
|
||||
stream.reset();
|
||||
TermAttribute term = stream.addAttribute(TermAttribute.class);
|
||||
PositionIncrementAttribute posIncr = stream.addAttribute(PositionIncrementAttribute.class);
|
||||
OffsetAttribute offset = stream.addAttribute(OffsetAttribute.class);
|
||||
TypeAttribute type = stream.addAttribute(TypeAttribute.class);
|
||||
|
||||
int position = 0;
|
||||
while (stream.incrementToken()) {
|
||||
int increment = posIncr.getPositionIncrement();
|
||||
if (increment > 0) {
|
||||
position = position + increment;
|
||||
}
|
||||
tokens.add(new AnalyzeResponse.AnalyzeToken(term.term(), position, offset.startOffset(), offset.endOffset(), type.type()));
|
||||
}
|
||||
stream.end();
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchException("failed to analyze", e);
|
||||
} finally {
|
||||
if (stream != null) {
|
||||
try {
|
||||
stream.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new AnalyzeResponse(tokens);
|
||||
}
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.single.custom;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class SingleCustomOperationRequest implements ActionRequest {
|
||||
|
||||
private boolean threadedListener = false;
|
||||
private boolean threadedOperation = true;
|
||||
|
||||
protected SingleCustomOperationRequest() {
|
||||
}
|
||||
|
||||
@Override public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should the listener be called on a separate thread if needed.
|
||||
*/
|
||||
@Override public boolean listenerThreaded() {
|
||||
return threadedListener;
|
||||
}
|
||||
|
||||
@Override public SingleCustomOperationRequest listenerThreaded(boolean threadedListener) {
|
||||
this.threadedListener = threadedListener;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Controls if the operation will be executed on a separate thread when executed locally.
|
||||
*/
|
||||
public boolean operationThreaded() {
|
||||
return threadedOperation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Controls if the operation will be executed on a separate thread when executed locally.
|
||||
*/
|
||||
public SingleCustomOperationRequest operationThreaded(boolean threadedOperation) {
|
||||
this.threadedOperation = threadedOperation;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
// no need to pass threading over the network, they are always false when coming throw a thread pool
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,298 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.single.custom;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.NoShardAvailableActionException;
|
||||
import org.elasticsearch.action.support.BaseAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class TransportSingleCustomOperationAction<Request extends SingleCustomOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
|
||||
|
||||
protected final ClusterService clusterService;
|
||||
|
||||
protected final TransportService transportService;
|
||||
|
||||
protected final ThreadPool threadPool;
|
||||
|
||||
protected TransportSingleCustomOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
this.threadPool = threadPool;
|
||||
|
||||
transportService.registerHandler(transportAction(), new TransportHandler());
|
||||
transportService.registerHandler(transportShardAction(), new ShardTransportHandler());
|
||||
}
|
||||
|
||||
@Override protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
new AsyncSingleAction(request, listener).start();
|
||||
}
|
||||
|
||||
protected abstract String transportAction();
|
||||
|
||||
protected abstract String transportShardAction();
|
||||
|
||||
protected abstract ShardsIterator shards(ClusterState state, Request request);
|
||||
|
||||
protected abstract Response shardOperation(Request request, int shardId) throws ElasticSearchException;
|
||||
|
||||
protected abstract Request newRequest();
|
||||
|
||||
protected abstract Response newResponse();
|
||||
|
||||
protected void checkBlock(Request request, ClusterState state) {
|
||||
|
||||
}
|
||||
|
||||
private class AsyncSingleAction {
|
||||
|
||||
private final ActionListener<Response> listener;
|
||||
|
||||
private final ShardsIterator shardsIt;
|
||||
|
||||
private final Request request;
|
||||
|
||||
private final DiscoveryNodes nodes;
|
||||
|
||||
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
||||
ClusterState clusterState = clusterService.state();
|
||||
nodes = clusterState.nodes();
|
||||
|
||||
checkBlock(request, clusterState);
|
||||
this.shardsIt = shards(clusterState, request);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
performFirst();
|
||||
}
|
||||
|
||||
private void onFailure(ShardRouting shardRouting, Exception e) {
|
||||
if (logger.isTraceEnabled() && e != null) {
|
||||
logger.trace(shardRouting.shortSummary() + ": Failed to execute [" + request + "]", e);
|
||||
}
|
||||
perform(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* First get should try and use a shard that exists on a local node for better performance
|
||||
*/
|
||||
private void performFirst() {
|
||||
while (shardsIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardsIt.nextActive();
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
if (request.operationThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
Response response = shardOperation(request, shard.id());
|
||||
listener.onResponse(response);
|
||||
} catch (Exception e) {
|
||||
onFailure(shard, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
return;
|
||||
} else {
|
||||
try {
|
||||
final Response response = shardOperation(request, shard.id());
|
||||
if (request.listenerThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
onFailure(shard, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!shardsIt.hasNextActive()) {
|
||||
// no local node get, go remote
|
||||
shardsIt.reset();
|
||||
perform(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void perform(final Exception lastException) {
|
||||
while (shardsIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardsIt.nextActive();
|
||||
// no need to check for local nodes, we tried them already in performFirstGet
|
||||
if (!shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
DiscoveryNode node = nodes.get(shard.currentNodeId());
|
||||
transportService.sendRequest(node, transportShardAction(), new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler<Response>() {
|
||||
@Override public Response newInstance() {
|
||||
return newResponse();
|
||||
}
|
||||
|
||||
@Override public void handleResponse(final Response response) {
|
||||
if (request.listenerThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void handleException(TransportException exp) {
|
||||
onFailure(shard, exp);
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
// no need to spawn, we will execute the listener on a different thread if needed in handleResponse
|
||||
return false;
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!shardsIt.hasNextActive()) {
|
||||
Exception failure = lastException;
|
||||
if (failure == null) {
|
||||
failure = new NoShardAvailableActionException(null, "No shard available for [" + request + "]");
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to execute [" + request + "]", failure);
|
||||
}
|
||||
}
|
||||
if (request.listenerThreaded()) {
|
||||
final Exception fFailure = failure;
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
listener.onFailure(fFailure);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class TransportHandler extends BaseTransportRequestHandler<Request> {
|
||||
|
||||
@Override public Request newInstance() {
|
||||
return newRequest();
|
||||
}
|
||||
|
||||
@Override public void messageReceived(Request request, final TransportChannel channel) throws Exception {
|
||||
// no need to have a threaded listener since we just send back a response
|
||||
request.listenerThreaded(false);
|
||||
// if we have a local operation, execute it on a thread since we don't spawn
|
||||
request.operationThreaded(true);
|
||||
execute(request, new ActionListener<Response>() {
|
||||
@Override public void onResponse(Response result) {
|
||||
try {
|
||||
channel.sendResponse(result);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception e1) {
|
||||
logger.warn("Failed to send response for get", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class ShardTransportHandler extends BaseTransportRequestHandler<ShardSingleOperationRequest> {
|
||||
|
||||
@Override public ShardSingleOperationRequest newInstance() {
|
||||
return new ShardSingleOperationRequest();
|
||||
}
|
||||
|
||||
@Override public void messageReceived(ShardSingleOperationRequest request, TransportChannel channel) throws Exception {
|
||||
Response response = shardOperation(request.request(), request.shardId());
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
protected class ShardSingleOperationRequest implements Streamable {
|
||||
|
||||
private Request request;
|
||||
|
||||
private int shardId;
|
||||
|
||||
ShardSingleOperationRequest() {
|
||||
}
|
||||
|
||||
public ShardSingleOperationRequest(Request request, int shardId) {
|
||||
this.request = request;
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
public Request request() {
|
||||
return request;
|
||||
}
|
||||
|
||||
public int shardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
request = newRequest();
|
||||
request.readFrom(in);
|
||||
shardId = in.readVInt();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
request.writeTo(out);
|
||||
out.writeVInt(shardId);
|
||||
}
|
||||
}
|
||||
}
|
@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
@ -50,6 +52,7 @@ import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
|
||||
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
|
||||
import org.elasticsearch.client.action.admin.indices.alias.IndicesAliasesRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.analyze.AnalyzeRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.cache.clear.ClearIndicesCacheRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.close.CloseIndexRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
@ -399,4 +402,16 @@ public interface IndicesAdminClient {
|
||||
* Update indices settings.
|
||||
*/
|
||||
UpdateSettingsRequestBuilder prepareUpdateSettings(String... indices);
|
||||
|
||||
ActionFuture<AnalyzeResponse> analyze(AnalyzeRequest request);
|
||||
|
||||
void analyze(AnalyzeRequest request, ActionListener<AnalyzeResponse> listener);
|
||||
|
||||
/**
|
||||
* Analyze text under the provided index.
|
||||
*
|
||||
* @param index The index name
|
||||
* @param text The text to analyze
|
||||
*/
|
||||
AnalyzeRequestBuilder prepareAnalyzer(String index, String text);
|
||||
}
|
||||
|
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.action.admin.indices.analyze;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.client.action.admin.indices.support.BaseIndicesRequestBuilder;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class AnalyzeRequestBuilder extends BaseIndicesRequestBuilder<AnalyzeRequest, AnalyzeResponse> {
|
||||
|
||||
public AnalyzeRequestBuilder(IndicesAdminClient indicesClient, String index, String text) {
|
||||
super(indicesClient, new AnalyzeRequest(index, text));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the analyzer name to use in order to analyze the text.
|
||||
*
|
||||
* @param analyzer The analyzer name.
|
||||
*/
|
||||
public AnalyzeRequestBuilder setAnalyzer(String analyzer) {
|
||||
request.analyzer(analyzer);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override protected void doExecute(ActionListener<AnalyzeResponse> listener) {
|
||||
client.analyze(request, listener);
|
||||
}
|
||||
}
|
@ -24,6 +24,9 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
|
||||
import org.elasticsearch.action.admin.indices.analyze.TransportAnalyzeAction;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.TransportClearIndicesCacheAction;
|
||||
@ -104,13 +107,15 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
|
||||
|
||||
private final TransportUpdateSettingsAction updateSettingsAction;
|
||||
|
||||
private final TransportAnalyzeAction analyzeAction;
|
||||
|
||||
@Inject public NodeIndicesAdminClient(Settings settings, ThreadPool threadPool, TransportIndicesStatusAction indicesStatusAction,
|
||||
TransportCreateIndexAction createIndexAction, TransportDeleteIndexAction deleteIndexAction,
|
||||
TransportCloseIndexAction closeIndexAction, TransportOpenIndexAction openIndexAction,
|
||||
TransportRefreshAction refreshAction, TransportFlushAction flushAction, TransportOptimizeAction optimizeAction,
|
||||
TransportPutMappingAction putMappingAction, TransportDeleteMappingAction deleteMappingAction, TransportGatewaySnapshotAction gatewaySnapshotAction,
|
||||
TransportIndicesAliasesAction indicesAliasesAction, TransportClearIndicesCacheAction clearIndicesCacheAction,
|
||||
TransportUpdateSettingsAction updateSettingsAction) {
|
||||
TransportUpdateSettingsAction updateSettingsAction, TransportAnalyzeAction analyzeAction) {
|
||||
this.threadPool = threadPool;
|
||||
this.indicesStatusAction = indicesStatusAction;
|
||||
this.createIndexAction = createIndexAction;
|
||||
@ -126,6 +131,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
|
||||
this.indicesAliasesAction = indicesAliasesAction;
|
||||
this.clearIndicesCacheAction = clearIndicesCacheAction;
|
||||
this.updateSettingsAction = updateSettingsAction;
|
||||
this.analyzeAction = analyzeAction;
|
||||
}
|
||||
|
||||
@Override public ThreadPool threadPool() {
|
||||
@ -243,4 +249,12 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
|
||||
@Override public void updateSettings(UpdateSettingsRequest request, ActionListener<UpdateSettingsResponse> listener) {
|
||||
updateSettingsAction.execute(request, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<AnalyzeResponse> analyze(AnalyzeRequest request) {
|
||||
return analyzeAction.execute(request);
|
||||
}
|
||||
|
||||
@Override public void analyze(AnalyzeRequest request, ActionListener<AnalyzeResponse> listener) {
|
||||
analyzeAction.execute(request, listener);
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.client.support;
|
||||
|
||||
import org.elasticsearch.client.action.admin.indices.alias.IndicesAliasesRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.analyze.AnalyzeRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.cache.clear.ClearIndicesCacheRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.close.CloseIndexRequestBuilder;
|
||||
import org.elasticsearch.client.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
@ -95,4 +96,8 @@ public abstract class AbstractIndicesAdminClient implements InternalIndicesAdmin
|
||||
@Override public UpdateSettingsRequestBuilder prepareUpdateSettings(String... indices) {
|
||||
return new UpdateSettingsRequestBuilder(this).setIndices(indices);
|
||||
}
|
||||
|
||||
@Override public AnalyzeRequestBuilder prepareAnalyzer(String index, String text) {
|
||||
return new AnalyzeRequestBuilder(this, index, text);
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.
|
||||
import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction;
|
||||
import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.alias.ClientTransportIndicesAliasesAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.analyze.ClientTransportAnalyzeAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.cache.clear.ClientTransportClearIndicesCacheAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.close.ClientTransportCloseIndexAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.create.ClientTransportCreateIndexAction;
|
||||
@ -81,6 +82,7 @@ public class ClientTransportActionModule extends AbstractModule {
|
||||
bind(ClientTransportIndicesAliasesAction.class).asEagerSingleton();
|
||||
bind(ClientTransportClearIndicesCacheAction.class).asEagerSingleton();
|
||||
bind(ClientTransportUpdateSettingsAction.class).asEagerSingleton();
|
||||
bind(ClientTransportAnalyzeAction.class).asEagerSingleton();
|
||||
|
||||
bind(ClientTransportNodesInfoAction.class).asEagerSingleton();
|
||||
bind(ClientTransportNodesStatsAction.class).asEagerSingleton();
|
||||
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.transport.action.admin.indices.analyze;
|
||||
|
||||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
|
||||
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class ClientTransportAnalyzeAction extends BaseClientTransportAction<AnalyzeRequest, AnalyzeResponse> {
|
||||
|
||||
@Inject public ClientTransportAnalyzeAction(Settings settings, TransportService transportService) {
|
||||
super(settings, transportService, AnalyzeResponse.class);
|
||||
}
|
||||
|
||||
@Override protected String action() {
|
||||
return TransportActions.Admin.Indices.ANALYZE;
|
||||
}
|
||||
}
|
@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
|
||||
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
@ -54,6 +56,7 @@ import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.client.support.AbstractIndicesAdminClient;
|
||||
import org.elasticsearch.client.transport.TransportClientNodesService;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.alias.ClientTransportIndicesAliasesAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.analyze.ClientTransportAnalyzeAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.cache.clear.ClientTransportClearIndicesCacheAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.close.ClientTransportCloseIndexAction;
|
||||
import org.elasticsearch.client.transport.action.admin.indices.create.ClientTransportCreateIndexAction;
|
||||
@ -109,6 +112,8 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
||||
|
||||
private final ClientTransportUpdateSettingsAction updateSettingsAction;
|
||||
|
||||
private final ClientTransportAnalyzeAction analyzeAction;
|
||||
|
||||
@Inject public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool,
|
||||
ClientTransportIndicesStatusAction indicesStatusAction,
|
||||
ClientTransportCreateIndexAction createIndexAction, ClientTransportDeleteIndexAction deleteIndexAction,
|
||||
@ -116,7 +121,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
||||
ClientTransportRefreshAction refreshAction, ClientTransportFlushAction flushAction, ClientTransportOptimizeAction optimizeAction,
|
||||
ClientTransportPutMappingAction putMappingAction, ClientTransportDeleteMappingAction deleteMappingAction, ClientTransportGatewaySnapshotAction gatewaySnapshotAction,
|
||||
ClientTransportIndicesAliasesAction indicesAliasesAction, ClientTransportClearIndicesCacheAction clearIndicesCacheAction,
|
||||
ClientTransportUpdateSettingsAction updateSettingsAction) {
|
||||
ClientTransportUpdateSettingsAction updateSettingsAction, ClientTransportAnalyzeAction analyzeAction) {
|
||||
this.nodesService = nodesService;
|
||||
this.threadPool = threadPool;
|
||||
this.indicesStatusAction = indicesStatusAction;
|
||||
@ -133,6 +138,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
||||
this.indicesAliasesAction = indicesAliasesAction;
|
||||
this.clearIndicesCacheAction = clearIndicesCacheAction;
|
||||
this.updateSettingsAction = updateSettingsAction;
|
||||
this.analyzeAction = analyzeAction;
|
||||
}
|
||||
|
||||
@Override public ThreadPool threadPool() {
|
||||
@ -361,7 +367,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
||||
}
|
||||
|
||||
@Override public ActionFuture<UpdateSettingsResponse> updateSettings(final UpdateSettingsRequest request) {
|
||||
return nodesService.execute(new TransportClientNodesService.NodeCallback<org.elasticsearch.action.ActionFuture<UpdateSettingsResponse>>() {
|
||||
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<UpdateSettingsResponse>>() {
|
||||
@Override public ActionFuture<UpdateSettingsResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
return updateSettingsAction.execute(node, request);
|
||||
}
|
||||
@ -376,4 +382,21 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public ActionFuture<AnalyzeResponse> analyze(final AnalyzeRequest request) {
|
||||
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<AnalyzeResponse>>() {
|
||||
@Override public ActionFuture<AnalyzeResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
return analyzeAction.execute(node, request);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public void analyze(final AnalyzeRequest request, final ActionListener<AnalyzeResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
analyzeAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.collect.UnmodifiableIterator;
|
||||
@ -30,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.Immutable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.common.collect.Lists.*;
|
||||
|
||||
@ -45,9 +47,20 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||
// shards with state set to UNASSIGNED
|
||||
private final ImmutableMap<Integer, IndexShardRoutingTable> shards;
|
||||
|
||||
private final ImmutableList<ShardRouting> allShards;
|
||||
|
||||
private final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
IndexRoutingTable(String index, Map<Integer, IndexShardRoutingTable> shards) {
|
||||
this.index = index;
|
||||
this.shards = ImmutableMap.copyOf(shards);
|
||||
ImmutableList.Builder<ShardRouting> allShards = ImmutableList.builder();
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : shards.values()) {
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
allShards.add(shardRouting);
|
||||
}
|
||||
}
|
||||
this.allShards = allShards.build();
|
||||
}
|
||||
|
||||
public String index() {
|
||||
@ -137,6 +150,13 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||
return shards;
|
||||
}
|
||||
|
||||
/**
|
||||
* An iterator over all shards (including replicas).
|
||||
*/
|
||||
public ShardsIterator allShardsIt() {
|
||||
return new PlainShardsIterator(allShards, Math.abs(counter.incrementAndGet()));
|
||||
}
|
||||
|
||||
/**
|
||||
* A group shards iterator where each group ({@link ShardIterator}
|
||||
* is an iterator across shard replication group.
|
||||
|
@ -37,7 +37,7 @@ public class IndexException extends ElasticSearchException {
|
||||
}
|
||||
|
||||
protected IndexException(Index index, boolean withSpace, String msg, Throwable cause) {
|
||||
super("[" + index.name() + "]" + (withSpace ? " " : "") + msg, cause);
|
||||
super("[" + (index == null ? "_na" : index.name()) + "]" + (withSpace ? " " : "") + msg, cause);
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.index.CloseableIndexComponent;
|
||||
import org.elasticsearch.index.IndexComponent;
|
||||
import org.elasticsearch.index.IndexShardMissingException;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.engine.IndexEngine;
|
||||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
@ -45,6 +46,8 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard>, Clos
|
||||
|
||||
IndexCache cache();
|
||||
|
||||
AnalysisService analysisService();
|
||||
|
||||
MapperService mapperService();
|
||||
|
||||
IndexQueryParserService queryParserService();
|
||||
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.none.NoneGateway;
|
||||
import org.elasticsearch.index.*;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
@ -89,6 +90,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
|
||||
|
||||
private final InternalIndicesLifecycle indicesLifecycle;
|
||||
|
||||
private final AnalysisService analysisService;
|
||||
|
||||
private final MapperService mapperService;
|
||||
|
||||
private final IndexQueryParserService queryParserService;
|
||||
@ -110,13 +113,14 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
|
||||
private final CleanCacheOnIndicesLifecycleListener cleanCacheOnIndicesLifecycleListener = new CleanCacheOnIndicesLifecycleListener();
|
||||
|
||||
@Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
|
||||
MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService,
|
||||
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService,
|
||||
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore) {
|
||||
super(index, indexSettings);
|
||||
this.injector = injector;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.threadPool = threadPool;
|
||||
this.indexSettings = indexSettings;
|
||||
this.analysisService = analysisService;
|
||||
this.mapperService = mapperService;
|
||||
this.queryParserService = queryParserService;
|
||||
this.similarityService = similarityService;
|
||||
@ -175,6 +179,10 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
|
||||
return indexCache;
|
||||
}
|
||||
|
||||
@Override public AnalysisService analysisService() {
|
||||
return this.analysisService;
|
||||
}
|
||||
|
||||
@Override public MapperService mapperService() {
|
||||
return mapperService;
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ public class IndexShardException extends IndexException {
|
||||
}
|
||||
|
||||
public IndexShardException(ShardId shardId, String msg, Throwable cause) {
|
||||
super(shardId.index(), false, "[" + shardId.id() + "] " + msg, cause);
|
||||
super(shardId == null ? null : shardId.index(), false, "[" + shardId == null ? "_na" : shardId.id() + "] " + msg, cause);
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicat
|
||||
import org.elasticsearch.rest.action.admin.cluster.ping.single.RestSinglePingAction;
|
||||
import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.analyze.RestAnalyzeAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.cache.clear.RestClearIndicesCacheAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.close.RestCloseIndexAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.create.RestCreateIndexAction;
|
||||
@ -81,6 +82,7 @@ public class RestActionModule extends AbstractModule {
|
||||
bind(RestCloseIndexAction.class).asEagerSingleton();
|
||||
bind(RestOpenIndexAction.class).asEagerSingleton();
|
||||
bind(RestUpdateSettingsAction.class).asEagerSingleton();
|
||||
bind(RestAnalyzeAction.class).asEagerSingleton();
|
||||
|
||||
bind(RestPutMappingAction.class).asEagerSingleton();
|
||||
bind(RestDeleteMappingAction.class).asEagerSingleton();
|
||||
|
@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.rest.action.admin.indices.analyze;
|
||||
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.rest.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.*;
|
||||
import static org.elasticsearch.rest.RestResponse.Status.*;
|
||||
import static org.elasticsearch.rest.action.support.RestXContentBuilder.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class RestAnalyzeAction extends BaseRestHandler {
|
||||
|
||||
@Inject public RestAnalyzeAction(Settings settings, Client client, RestController controller) {
|
||||
super(settings, client);
|
||||
controller.registerHandler(GET, "/{index}/_analyze", this);
|
||||
controller.registerHandler(POST, "/{index}/_analyze", this);
|
||||
}
|
||||
|
||||
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
|
||||
String text = request.param("text");
|
||||
if (text == null && request.hasContent()) {
|
||||
text = request.contentAsString();
|
||||
}
|
||||
if (text == null) {
|
||||
try {
|
||||
channel.sendResponse(new XContentThrowableRestResponse(request, new ElasticSearchIllegalArgumentException("text is missing")));
|
||||
} catch (IOException e1) {
|
||||
logger.warn("Failed to send response", e1);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"), text);
|
||||
analyzeRequest.analyzer(request.param("analyzer"));
|
||||
client.admin().indices().analyze(analyzeRequest, new ActionListener<AnalyzeResponse>() {
|
||||
@Override public void onResponse(AnalyzeResponse response) {
|
||||
try {
|
||||
XContentBuilder builder = restContentBuilder(request);
|
||||
builder.startObject();
|
||||
response.toXContent(builder, request);
|
||||
builder.endObject();
|
||||
channel.sendResponse(new XContentRestResponse(request, OK, builder));
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(new XContentThrowableRestResponse(request, e));
|
||||
} catch (IOException e1) {
|
||||
logger.error("Failed to send failure response", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.integration.indices.analyze;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class AnalyzeActionTests extends AbstractNodesTests {
|
||||
|
||||
private Client client;
|
||||
|
||||
@BeforeClass public void createNodes() throws Exception {
|
||||
startNode("server1");
|
||||
startNode("server2");
|
||||
client = getClient();
|
||||
}
|
||||
|
||||
@AfterClass public void closeNodes() {
|
||||
client.close();
|
||||
closeAllNodes();
|
||||
}
|
||||
|
||||
protected Client getClient() {
|
||||
return client("server1");
|
||||
}
|
||||
|
||||
@Test public void simpleAnalyzerTests() throws Exception {
|
||||
try {
|
||||
client.admin().indices().prepareDelete("test").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
client.admin().indices().prepareCreate("test").execute().actionGet();
|
||||
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
AnalyzeResponse analyzeResponse = client.admin().indices().prepareAnalyzer("test", "this is a test").execute().actionGet();
|
||||
assertThat(analyzeResponse.tokens().size(), equalTo(1));
|
||||
AnalyzeResponse.AnalyzeToken token = analyzeResponse.tokens().get(0);
|
||||
assertThat(token.term(), equalTo("test"));
|
||||
assertThat(token.startOffset(), equalTo(10));
|
||||
assertThat(token.endOffset(), equalTo(14));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user