Update API: Allow to update a document based on a script, closes #1583.

This commit is contained in:
Shay Banon 2012-01-02 22:02:19 +02:00
parent e582f6c91a
commit 83d5084f62
44 changed files with 1906 additions and 144 deletions

View File

@ -71,6 +71,7 @@ import org.elasticsearch.action.percolate.TransportPercolateAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.search.type.*;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.common.inject.AbstractModule;
/**
@ -126,6 +127,7 @@ public class TransportActionModule extends AbstractModule {
bind(TransportIndexDeleteAction.class).asEagerSingleton();
bind(TransportShardDeleteAction.class).asEagerSingleton();
bind(TransportCountAction.class).asEagerSingleton();
bind(TransportUpdateAction.class).asEagerSingleton();
bind(TransportMultiGetAction.class).asEagerSingleton();
bind(TransportShardMultiGetAction.class).asEagerSingleton();

View File

@ -28,6 +28,8 @@ public class TransportActions {
public static final String INDEX = "indices/index/shard/index";
public static final String UPDATE = "update";
public static final String COUNT = "indices/count";
public static final String DELETE = "indices/index/shard/delete";

View File

@ -0,0 +1,108 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.instance;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.Actions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
*
*/
public abstract class InstanceShardOperationRequest implements ActionRequest {
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;
// -1 means its not set, allows to explicitly direct a request to a specific shard
protected int shardId = -1;
private boolean threadedListener = false;
protected InstanceShardOperationRequest() {
}
public InstanceShardOperationRequest(String index) {
this.index = index;
}
public TimeValue timeout() {
return timeout;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (index == null) {
validationException = Actions.addValidationError("index is missing", validationException);
}
return validationException;
}
public String index() {
return index;
}
InstanceShardOperationRequest index(String index) {
this.index = index;
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@Override
public boolean listenerThreaded() {
return threadedListener;
}
@Override
public InstanceShardOperationRequest listenerThreaded(boolean threadedListener) {
this.threadedListener = threadedListener;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
shardId = in.readInt();
timeout = TimeValue.readTimeValue(in);
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeInt(shardId);
timeout.writeTo(out);
}
public void beforeLocalFork() {
}
}

View File

@ -0,0 +1,306 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.instance;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
public abstract class TransportInstanceSingleOperationAction<Request extends InstanceShardOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
final String transportAction;
final String executor;
protected TransportInstanceSingleOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings, threadPool);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportAction = transportAction();
this.executor = executor();
transportService.registerHandler(transportAction, new TransportHandler());
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
protected abstract String executor();
protected abstract String transportAction();
protected abstract void shardOperation(Request request, ActionListener<Response> listener) throws ElasticSearchException;
protected abstract Request newRequest();
protected abstract Response newResponse();
protected void checkBlock(Request request, ClusterState state) {
}
protected boolean retryRequired(ShardIterator shardIt, ClusterState state) {
return false;
}
protected boolean retryOnFailure(Throwable e) {
return false;
}
protected TransportRequestOptions transportOptions() {
return TransportRequestOptions.EMPTY;
}
/**
* Should return an iterator with a single shard!
*/
protected abstract ShardIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
class AsyncSingleAction {
private final ActionListener<Response> listener;
private final Request request;
private ShardIterator shardIt;
private DiscoveryNodes nodes;
private final AtomicBoolean operationStarted = new AtomicBoolean();
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
request.index(clusterState.metaData().concreteIndex(request.index()));
checkBlock(request, clusterState);
}
public void start() {
start(false);
}
public boolean start(final boolean fromClusterEvent) throws ElasticSearchException {
final ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
if (!clusterState.routingTable().hasIndex(request.index())) {
retry(fromClusterEvent);
return false;
}
try {
shardIt = shards(clusterState, request);
} catch (Exception e) {
listener.onFailure(e);
return true;
}
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
retry(fromClusterEvent);
return false;
}
// this transport only make sense with an iterator that returns a single shard routing (like primary)
assert shardIt.size() == 1;
ShardRouting shard = shardIt.nextOrNull();
assert shard != null;
if (!shard.active()) {
retry(fromClusterEvent);
return false;
}
if (!operationStarted.compareAndSet(false, true)) {
return true;
}
request.shardId = shardIt.shardId().id();
if (shard.currentNodeId().equals(nodes.localNodeId())) {
request.beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
shardOperation(request, listener);
} catch (Exception e) {
if (retryOnFailure(e)) {
retry(fromClusterEvent);
} else {
listener.onFailure(e);
}
}
}
});
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
retryOnFailure(exp)) {
operationStarted.set(false);
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
retry(false);
} else {
listener.onFailure(exp);
}
}
});
}
return true;
}
void retry(boolean fromClusterEvent) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
@Override
public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
return;
}
clusterService.remove(this);
final UnavailableShardsException failure;
if (shardIt == null) {
failure = new UnavailableShardsException(new ShardId(request.index(), -1), "Timeout waiting for [" + timeValue + "], request: " + request.toString());
} else {
failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
}
listener.onFailure(failure);
}
});
}
}
}
class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get", e1);
}
}
});
}
}
}

View File

@ -0,0 +1,254 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.update;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentMissingEngineException;
import org.elasticsearch.index.engine.DocumentSourceMissingEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
private final IndicesService indicesService;
private final TransportDeleteAction deleteAction;
private final TransportIndexAction indexAction;
private final ScriptService scriptService;
@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, TransportIndexAction indexAction, TransportDeleteAction deleteAction, ScriptService scriptService) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.scriptService = scriptService;
}
@Override
protected String transportAction() {
return TransportActions.UPDATE;
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override
protected UpdateRequest newRequest() {
return new UpdateRequest();
}
@Override
protected UpdateResponse newResponse() {
return new UpdateResponse();
}
@Override
protected void checkBlock(UpdateRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
@Override
protected boolean retryOnFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof IllegalIndexShardStateException) {
return true;
}
return false;
}
@Override
protected ShardIterator shards(ClusterState clusterState, UpdateRequest request) throws ElasticSearchException {
if (request.shardId() != -1) {
return clusterState.routingTable().index(request.index()).shard(request.shardId()).primaryShardIt();
}
ShardIterator shardIterator = clusterService.operationRouting()
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
ShardRouting shard;
while ((shard = shardIterator.nextOrNull()) != null) {
if (shard.primary()) {
return new PlainShardIterator(shardIterator.shardId(), ImmutableList.of(shard));
}
}
return new PlainShardIterator(shardIterator.shardId(), ImmutableList.<ShardRouting>of());
}
@Override
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener) throws ElasticSearchException {
shardOperation(request, listener, 0);
}
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) throws ElasticSearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());
GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TimestampFieldMapper.NAME}, true);
// no doc, what to do, what to do...
if (!getResult.exists()) {
listener.onFailure(new DocumentMissingEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
return;
}
if (getResult.internalSourceRef() == null) {
// no source, we can't do nothing, through a failure...
listener.onFailure(new DocumentSourceMissingEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
return;
}
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef().bytes(), getResult.internalSourceRef().offset(), getResult.internalSourceRef().length(), true);
Map<String, Object> source = sourceAndContent.v2();
Map<String, Object> ctx = new HashMap<String, Object>(2);
ctx.put("_source", source);
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("failed to execute script", e);
}
String operation = (String) ctx.get("op");
source = (Map<String, Object>) ctx.get("_source");
// apply script to update the source
String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null;
String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null;
// TODO ttl/timestamp
// TODO percolate?
// TODO: external version type, does it make sense here? does not seem like it...
if (operation == null || "index".equals(operation)) {
IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(source, sourceAndContent.v1())
.version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version());
listener.onResponse(update);
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if ((retryCount + 1) < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
}
});
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false);
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version());
listener.onResponse(update);
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if ((retryCount + 1) < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(e);
}
});
} else if ("none".equals(operation)) {
listener.onResponse(new UpdateResponse(getResult.index(), getResult.type(), getResult.id(), getResult.version()));
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);
listener.onResponse(new UpdateResponse(getResult.index(), getResult.type(), getResult.id(), getResult.version()));
}
}
}

View File

@ -0,0 +1,325 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.update;
import com.google.common.collect.Maps;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.action.Actions.addValidationError;
/**
*/
public class UpdateRequest extends InstanceShardOperationRequest {
private String type;
private String id;
@Nullable
private String routing;
String script;
@Nullable
String scriptLang;
@Nullable
Map<String, Object> scriptParams;
int retryOnConflict = 1;
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
UpdateRequest() {
}
public UpdateRequest(String index, String type, String id) {
this.index = index;
this.type = type;
this.id = id;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (type == null) {
validationException = addValidationError("type is missing", validationException);
}
if (id == null) {
validationException = addValidationError("id is missing", validationException);
}
if (script == null) {
validationException = addValidationError("script is missing", validationException);
}
return validationException;
}
/**
* Sets the index the document will exists on.
*/
public UpdateRequest index(String index) {
this.index = index;
return this;
}
/**
* The type of the indexed document.
*/
public String type() {
return type;
}
/**
* Sets the type of the indexed document.
*/
public UpdateRequest type(String type) {
this.type = type;
return this;
}
/**
* The id of the indexed document.
*/
public String id() {
return id;
}
/**
* Sets the id of the indexed document.
*/
public UpdateRequest id(String id) {
this.id = id;
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public UpdateRequest routing(String routing) {
if (routing != null && routing.length() == 0) {
this.routing = null;
} else {
this.routing = routing;
}
return this;
}
/**
* Sets the parent id of this document. Will simply set the routing to this value, as it is only
* used for routing with delete requests.
*/
public UpdateRequest parent(String parent) {
if (routing == null) {
routing = parent;
}
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public String routing() {
return this.routing;
}
int shardId() {
return this.shardId;
}
/**
* The script to execute. Note, make sure not to send different script each times and instead
* use script params if possible with the same (automatically compiled) script.
*/
public UpdateRequest script(String script) {
this.script = script;
return this;
}
/**
* The language of the script to execute.
*/
public UpdateRequest scriptLang(String scriptLang) {
this.scriptLang = scriptLang;
return this;
}
/**
* Add a script parameter.
*/
public UpdateRequest addScriptParam(String name, Object value) {
if (scriptParams == null) {
scriptParams = Maps.newHashMap();
}
scriptParams.put(name, value);
return this;
}
/**
* Sets the script parameters to use with the script.
*/
public UpdateRequest scriptParams(Map<String, Object> scriptParams) {
if (this.scriptParams == null) {
this.scriptParams = scriptParams;
} else {
this.scriptParams.putAll(scriptParams);
}
return this;
}
/**
* The script to execute. Note, make sure not to send different script each times and instead
* use script params if possible with the same (automatically compiled) script.
*/
public UpdateRequest script(String script, @Nullable Map<String, Object> scriptParams) {
this.script = script;
if (this.scriptParams != null) {
this.scriptParams.putAll(scriptParams);
} else {
this.scriptParams = scriptParams;
}
return this;
}
/**
* The script to execute. Note, make sure not to send different script each times and instead
* use script params if possible with the same (automatically compiled) script.
*
* @param script The script to execute
* @param scriptLang The script language
* @param scriptParams The script parameters
*/
public UpdateRequest script(String script, @Nullable String scriptLang, @Nullable Map<String, Object> scriptParams) {
this.script = script;
this.scriptLang = scriptLang;
if (this.scriptParams != null) {
this.scriptParams.putAll(scriptParams);
} else {
this.scriptParams = scriptParams;
}
return this;
}
/**
* Sets the number of retries of a version conflict occurs because the document was updated between
* getting it and updating it. Defaults to 1.
*/
public UpdateRequest retryOnConflict(int retryOnConflict) {
this.retryOnConflict = retryOnConflict;
return this;
}
public int retryOnConflict() {
return this.retryOnConflict;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public UpdateRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public UpdateRequest timeout(String timeout) {
return timeout(TimeValue.parseTimeValue(timeout, null));
}
/**
* The replication type.
*/
public ReplicationType replicationType() {
return this.replicationType;
}
/**
* Sets the replication type.
*/
public UpdateRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
public UpdateRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
type = in.readUTF();
id = in.readUTF();
if (in.readBoolean()) {
routing = in.readUTF();
}
script = in.readUTF();
if (in.readBoolean()) {
scriptLang = in.readUTF();
}
scriptParams = in.readMap();
retryOnConflict = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
out.writeUTF(type);
out.writeUTF(id);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
out.writeUTF(script);
if (scriptLang == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(scriptLang);
}
out.writeMap(scriptParams);
out.writeVInt(retryOnConflict);
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.update;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class UpdateResponse implements ActionResponse {
private String index;
private String id;
private String type;
private long version;
public UpdateResponse() {
}
public UpdateResponse(String index, String type, String id, long version) {
this.index = index;
this.id = id;
this.type = type;
this.version = version;
}
/**
* The index the document was indexed into.
*/
public String index() {
return this.index;
}
/**
* The index the document was indexed into.
*/
public String getIndex() {
return index;
}
/**
* The type of the document indexed.
*/
public String type() {
return this.type;
}
/**
* The type of the document indexed.
*/
public String getType() {
return type;
}
/**
* The id of the document indexed.
*/
public String id() {
return this.id;
}
/**
* The id of the document indexed.
*/
public String getId() {
return id;
}
/**
* Returns the version of the doc indexed.
*/
public long version() {
return this.version;
}
/**
* Returns the version of the doc indexed.
*/
public long getVersion() {
return version();
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
id = in.readUTF();
type = in.readUTF();
version = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(id);
out.writeUTF(type);
out.writeLong(version);
}
}

View File

@ -41,6 +41,8 @@ import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.action.count.CountRequestBuilder;
import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
@ -52,6 +54,7 @@ import org.elasticsearch.client.action.mlt.MoreLikeThisRequestBuilder;
import org.elasticsearch.client.action.percolate.PercolateRequestBuilder;
import org.elasticsearch.client.action.search.SearchRequestBuilder;
import org.elasticsearch.client.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.action.update.UpdateRequestBuilder;
import org.elasticsearch.common.Nullable;
/**
@ -64,7 +67,6 @@ import org.elasticsearch.common.Nullable;
* <p>A client can either be retrieved from a {@link org.elasticsearch.node.Node} started, or connected remotely
* to one or more nodes using {@link org.elasticsearch.client.transport.TransportClient}.
*
*
* @see org.elasticsearch.node.Node#client()
* @see org.elasticsearch.client.transport.TransportClient
*/
@ -109,6 +111,32 @@ public interface Client {
*/
IndexRequestBuilder prepareIndex();
/**
* Updates a document based on a script.
*
* @param request The update request
* @return The result future
*/
ActionFuture<UpdateResponse> update(UpdateRequest request);
/**
* Updates a document based on a script.
*
* @param request The update request
* @param listener A listener to be notified with a result
*/
void update(UpdateRequest request, ActionListener<UpdateResponse> listener);
/**
* Updates a document based on a script.
*/
UpdateRequestBuilder prepareUpdate();
/**
* Updates a document based on a script.
*/
UpdateRequestBuilder prepareUpdate(String index, String type, String id);
/**
* Index a document associated with a given index and type.
* <p/>

View File

@ -0,0 +1,157 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.action.update;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.support.BaseRequestBuilder;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Map;
/**
*/
public class UpdateRequestBuilder extends BaseRequestBuilder<UpdateRequest, UpdateResponse> {
public UpdateRequestBuilder(Client client, String index, String type, String id) {
super(client, new UpdateRequest(index, type, id));
}
/**
* Sets the index the document will exists on.
*/
public UpdateRequestBuilder setIndex(String index) {
request.index(index);
return this;
}
/**
* Sets the type of the indexed document.
*/
public UpdateRequestBuilder setType(String type) {
request.type(type);
return this;
}
/**
* Sets the id of the indexed document.
*/
public UpdateRequestBuilder setId(String id) {
request.id(id);
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public UpdateRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
public UpdateRequestBuilder setParent(String parent) {
request.parent(parent);
return this;
}
/**
* The script to execute. Note, make sure not to send different script each times and instead
* use script params if possible with the same (automatically compiled) script.
*/
public UpdateRequestBuilder setScript(String script) {
request.script(script);
return this;
}
/**
* The language of the script to execute.
*/
public UpdateRequestBuilder setScriptLang(String scriptLang) {
request.scriptLang(scriptLang);
return this;
}
/**
* Sets the script parameters to use with the script.
*/
public UpdateRequestBuilder setScriptParams(Map<String, Object> scriptParams) {
request.scriptParams(scriptParams);
return this;
}
/**
* Add a script parameter.
*/
public UpdateRequestBuilder addScriptParam(String name, Object value) {
request.addScriptParam(name, value);
return this;
}
/**
* Sets the number of retries of a version conflict occurs because the document was updated between
* getting it and updating it. Defaults to 1.
*/
public UpdateRequestBuilder setRetryOnConflict(int retryOnConflict) {
request.retryOnConflict(retryOnConflict);
return this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public UpdateRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public UpdateRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
/**
* Sets the replication type.
*/
public UpdateRequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return this;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
public UpdateRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);
return this;
}
@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
client.update(request, listener);
}
}

View File

@ -43,6 +43,9 @@ import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.TransportPercolateAction;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.client.support.AbstractClient;
@ -61,6 +64,8 @@ public class NodeClient extends AbstractClient implements InternalClient {
private final TransportIndexAction indexAction;
private final TransportUpdateAction updateAction;
private final TransportDeleteAction deleteAction;
private final TransportBulkAction bulkAction;
@ -83,13 +88,14 @@ public class NodeClient extends AbstractClient implements InternalClient {
@Inject
public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportBulkAction bulkAction,
TransportIndexAction indexAction, TransportUpdateAction updateAction, TransportDeleteAction deleteAction, TransportBulkAction bulkAction,
TransportDeleteByQueryAction deleteByQueryAction, TransportGetAction getAction, TransportMultiGetAction multiGetAction, TransportCountAction countAction,
TransportSearchAction searchAction, TransportSearchScrollAction searchScrollAction,
TransportMoreLikeThisAction moreLikeThisAction, TransportPercolateAction percolateAction) {
this.threadPool = threadPool;
this.admin = admin;
this.indexAction = indexAction;
this.updateAction = updateAction;
this.deleteAction = deleteAction;
this.bulkAction = bulkAction;
this.deleteByQueryAction = deleteByQueryAction;
@ -127,6 +133,16 @@ public class NodeClient extends AbstractClient implements InternalClient {
indexAction.execute(request, listener);
}
@Override
public ActionFuture<UpdateResponse> update(UpdateRequest request) {
return updateAction.execute(request);
}
@Override
public void update(UpdateRequest request, ActionListener<UpdateResponse> listener) {
updateAction.execute(request, listener);
}
@Override
public ActionFuture<DeleteResponse> delete(DeleteRequest request) {
return deleteAction.execute(request);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.client.action.mlt.MoreLikeThisRequestBuilder;
import org.elasticsearch.client.action.percolate.PercolateRequestBuilder;
import org.elasticsearch.client.action.search.SearchRequestBuilder;
import org.elasticsearch.client.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.common.Nullable;
@ -53,6 +54,16 @@ public abstract class AbstractClient implements InternalClient {
return prepareIndex().setIndex(index).setType(type).setId(id);
}
@Override
public UpdateRequestBuilder prepareUpdate() {
return new UpdateRequestBuilder(this, null, null, null);
}
@Override
public UpdateRequestBuilder prepareUpdate(String index, String type, String id) {
return new UpdateRequestBuilder(this, index, type, id);
}
@Override
public DeleteRequestBuilder prepareDelete() {
return new DeleteRequestBuilder(this, null);

View File

@ -43,6 +43,8 @@ import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.transport.action.ClientTransportActionModule;
@ -80,8 +82,6 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
* <p/>
* <p>The transport client important modules used is the {@link org.elasticsearch.transport.TransportModule} which is
* started in client mode (only connects, no bind).
*
*
*/
public class TransportClient extends AbstractClient {
@ -259,6 +259,16 @@ public class TransportClient extends AbstractClient {
internalClient.index(request, listener);
}
@Override
public ActionFuture<UpdateResponse> update(UpdateRequest request) {
return internalClient.update(request);
}
@Override
public void update(UpdateRequest request, ActionListener<UpdateResponse> listener) {
internalClient.update(request, listener);
}
@Override
public ActionFuture<DeleteResponse> delete(DeleteRequest request) {
return internalClient.delete(request);

View File

@ -61,6 +61,7 @@ import org.elasticsearch.client.transport.action.index.ClientTransportIndexActio
import org.elasticsearch.client.transport.action.percolate.ClientTransportPercolateAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction;
import org.elasticsearch.client.transport.action.update.ClientTransportUpdateAction;
import org.elasticsearch.common.inject.AbstractModule;
/**
@ -80,6 +81,7 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportSearchScrollAction.class).asEagerSingleton();
bind(ClientTransportBulkAction.class).asEagerSingleton();
bind(ClientTransportPercolateAction.class).asEagerSingleton();
bind(ClientTransportUpdateAction.class).asEagerSingleton();
bind(ClientTransportIndicesExistsAction.class).asEagerSingleton();
bind(ClientTransportIndicesStatsAction.class).asEagerSingleton();

View File

@ -0,0 +1,44 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.update;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
/**
*
*/
public class ClientTransportUpdateAction extends BaseClientTransportAction<UpdateRequest, UpdateResponse> {
@Inject
public ClientTransportUpdateAction(Settings settings, TransportService transportService) {
super(settings, transportService, UpdateResponse.class);
}
@Override
protected String action() {
return TransportActions.UPDATE;
}
}

View File

@ -42,6 +42,8 @@ import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.client.support.AbstractClient;
@ -57,6 +59,7 @@ import org.elasticsearch.client.transport.action.mlt.ClientTransportMoreLikeThis
import org.elasticsearch.client.transport.action.percolate.ClientTransportPercolateAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction;
import org.elasticsearch.client.transport.action.update.ClientTransportUpdateAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -75,6 +78,8 @@ public class InternalTransportClient extends AbstractClient implements InternalC
private final ClientTransportIndexAction indexAction;
private final ClientTransportUpdateAction updateAction;
private final ClientTransportDeleteAction deleteAction;
private final ClientTransportBulkAction bulkAction;
@ -98,7 +103,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
@Inject
public InternalTransportClient(Settings settings, ThreadPool threadPool,
TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
ClientTransportIndexAction indexAction, ClientTransportDeleteAction deleteAction, ClientTransportBulkAction bulkAction, ClientTransportGetAction getAction, ClientTransportMultiGetAction multiGetAction,
ClientTransportIndexAction indexAction, ClientTransportUpdateAction updateAction, ClientTransportDeleteAction deleteAction, ClientTransportBulkAction bulkAction, ClientTransportGetAction getAction, ClientTransportMultiGetAction multiGetAction,
ClientTransportDeleteByQueryAction deleteByQueryAction, ClientTransportCountAction countAction,
ClientTransportSearchAction searchAction, ClientTransportSearchScrollAction searchScrollAction,
ClientTransportMoreLikeThisAction moreLikeThisAction, ClientTransportPercolateAction percolateAction) {
@ -107,6 +112,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
this.adminClient = adminClient;
this.indexAction = indexAction;
this.updateAction = updateAction;
this.deleteAction = deleteAction;
this.bulkAction = bulkAction;
this.getAction = getAction;
@ -154,6 +160,26 @@ public class InternalTransportClient extends AbstractClient implements InternalC
}, listener);
}
@Override
public ActionFuture<UpdateResponse> update(final UpdateRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<UpdateResponse>>() {
@Override
public ActionFuture<UpdateResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
return updateAction.execute(node, request);
}
});
}
@Override
public void update(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<UpdateResponse>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<UpdateResponse> listener) throws ElasticSearchException {
updateAction.execute(node, request, listener);
}
}, listener);
}
@Override
public ActionFuture<DeleteResponse> delete(final DeleteRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteResponse>>() {

View File

@ -342,7 +342,7 @@ public class MappingMetaData {
* Converts the serialized compressed form of the mappings into a parsed map.
*/
public Map<String, Object> sourceAsMap() throws IOException {
Map<String, Object> mapping = XContentHelper.convertToMap(source.compressed(), 0, source.compressed().length).v2();
Map<String, Object> mapping = XContentHelper.convertToMap(source.compressed(), 0, source.compressed().length, true).v2();
if (mapping.size() == 1 && mapping.containsKey(type())) {
// the type name is the root value, reduce it
mapping = (Map<String, Object>) mapping.get(type());

View File

@ -19,9 +19,12 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
import java.util.*;
/**
*
@ -249,4 +252,65 @@ public abstract class StreamInput extends InputStream {
// readBytes(b, off, len);
// return len;
// }
public
@Nullable
Map<String, Object> readMap() throws IOException {
return (Map<String, Object>) readFieldValue();
}
@SuppressWarnings({"unchecked"})
private
@Nullable
Object readFieldValue() throws IOException {
byte type = readByte();
if (type == -1) {
return null;
} else if (type == 0) {
return readUTF();
} else if (type == 1) {
return readInt();
} else if (type == 2) {
return readLong();
} else if (type == 3) {
return readFloat();
} else if (type == 4) {
return readDouble();
} else if (type == 5) {
return readBoolean();
} else if (type == 6) {
int bytesSize = readVInt();
byte[] value = new byte[bytesSize];
readFully(value);
return value;
} else if (type == 7) {
int size = readVInt();
List list = new ArrayList(size);
for (int i = 0; i < size; i++) {
list.add(readFieldValue());
}
return list;
} else if (type == 8) {
int size = readVInt();
Object[] list = new Object[size];
for (int i = 0; i < size; i++) {
list[i] = readFieldValue();
}
return list;
} else if (type == 9 || type == 10) {
int size = readVInt();
Map map;
if (type == 9) {
map = new LinkedHashMap(size);
} else {
map = new HashMap(size);
}
for (int i = 0; i < size; i++) {
map.put(readUTF(), readFieldValue());
}
return map;
} else {
throw new IOException("Can't read unknown type [" + type + "]");
}
}
}

View File

@ -19,8 +19,13 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -209,4 +214,67 @@ public abstract class StreamOutput extends OutputStream {
public void write(byte[] b, int off, int len) throws IOException {
writeBytes(b, off, len);
}
public void writeMap(@Nullable Map<String, Object> map) throws IOException {
writeValue(map);
}
private void writeValue(@Nullable Object value) throws IOException {
if (value == null) {
writeByte((byte) -1);
return;
}
Class type = value.getClass();
if (type == String.class) {
writeByte((byte) 0);
writeUTF((String) value);
} else if (type == Integer.class) {
writeByte((byte) 1);
writeInt((Integer) value);
} else if (type == Long.class) {
writeByte((byte) 2);
writeLong((Long) value);
} else if (type == Float.class) {
writeByte((byte) 3);
writeFloat((Float) value);
} else if (type == Double.class) {
writeByte((byte) 4);
writeDouble((Double) value);
} else if (type == Boolean.class) {
writeByte((byte) 5);
writeBoolean((Boolean) value);
} else if (type == byte[].class) {
writeByte((byte) 6);
writeVInt(((byte[]) value).length);
writeBytes(((byte[]) value));
} else if (value instanceof List) {
writeByte((byte) 7);
List list = (List) value;
writeVInt(list.size());
for (Object o : list) {
writeValue(o);
}
} else if (value instanceof Object[]) {
writeByte((byte) 8);
Object[] list = (Object[]) value;
writeVInt(list.length);
for (Object o : list) {
writeValue(o);
}
} else if (value instanceof Map) {
if (value instanceof LinkedHashMap) {
writeByte((byte) 9);
} else {
writeByte((byte) 10);
}
Map<String, Object> map = (Map<String, Object>) value;
writeVInt(map.size());
for (Map.Entry<String, Object> entry : map.entrySet()) {
writeUTF(entry.getKey());
writeValue(entry.getValue());
}
} else {
throw new IOException("Can't write type [" + type + "]");
}
}
}

View File

@ -50,17 +50,24 @@ public class XContentHelper {
}
}
public static Tuple<XContentType, Map<String, Object>> convertToMap(byte[] data, int offset, int length) throws ElasticSearchParseException {
public static Tuple<XContentType, Map<String, Object>> convertToMap(byte[] data, int offset, int length, boolean ordered) throws ElasticSearchParseException {
try {
XContentParser parser;
XContentType contentType;
if (LZF.isCompressed(data, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(data, offset, length);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
return Tuple.create(contentType, XContentFactory.xContent(contentType).createParser(siLzf).mapAndClose());
parser = XContentFactory.xContent(contentType).createParser(siLzf);
} else {
XContentType contentType = XContentFactory.xContentType(data, offset, length);
return Tuple.create(contentType, XContentFactory.xContent(contentType).createParser(data, offset, length).mapAndClose());
contentType = XContentFactory.xContentType(data, offset, length);
parser = XContentFactory.xContent(contentType).createParser(data, offset, length);
}
if (ordered) {
return Tuple.create(contentType, parser.mapOrderedAndClose());
} else {
return Tuple.create(contentType, parser.mapAndClose());
}
} catch (IOException e) {
throw new ElasticSearchParseException("Failed to parse content to map", e);

View File

@ -122,6 +122,8 @@ public interface XContentParser {
Map<String, Object> mapAndClose() throws IOException;
Map<String, Object> mapOrderedAndClose() throws IOException;
String text() throws IOException;
String textOrNull() throws IOException;

View File

@ -124,4 +124,13 @@ public abstract class AbstractXContentParser implements XContentParser {
close();
}
}
@Override
public Map<String, Object> mapOrderedAndClose() throws IOException {
try {
return mapOrdered();
} finally {
close();
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
/**
*
*/
public class DocumentMissingEngineException extends EngineException {
public DocumentMissingEngineException(ShardId shardId, String type, String id) {
super(shardId, "[" + type + "][" + id + "]: document missing");
}
@Override
public RestStatus status() {
return RestStatus.CONFLICT;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
/**
*
*/
public class DocumentSourceMissingEngineException extends EngineException {
public DocumentSourceMissingEngineException(ShardId shardId, String type, String id) {
super(shardId, "[" + type + "][" + id + "]: document source missing");
}
@Override
public RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}

View File

@ -179,6 +179,13 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
return this.source;
}
/**
* Internal source representation, might be compressed....
*/
public BytesHolder internalSourceRef() {
return source;
}
/**
* Is the source empty (not available) or not.
*/

View File

@ -230,7 +230,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
if (filtered) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<XContentType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(data, dataOffset, dataLength);
Tuple<XContentType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(data, dataOffset, dataLength, true);
Map<String, Object> filteredSource = XContentMapValues.filter(mapTuple.v2(), includes, excludes);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
StreamOutput streamOutput;

View File

@ -71,6 +71,7 @@ import org.elasticsearch.rest.action.mlt.RestMoreLikeThisAction;
import org.elasticsearch.rest.action.percolate.RestPercolateAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.rest.action.update.RestUpdateAction;
import java.util.List;
@ -137,16 +138,14 @@ public class RestActionModule extends AbstractModule {
bind(RestClearIndicesCacheAction.class).asEagerSingleton();
bind(RestIndexAction.class).asEagerSingleton();
bind(RestGetAction.class).asEagerSingleton();
bind(RestMultiGetAction.class).asEagerSingleton();
bind(RestDeleteAction.class).asEagerSingleton();
bind(RestDeleteByQueryAction.class).asEagerSingleton();
bind(RestCountAction.class).asEagerSingleton();
bind(RestBulkAction.class).asEagerSingleton();
bind(RestUpdateAction.class).asEagerSingleton();
bind(RestPercolateAction.class).asEagerSingleton();
bind(RestSearchAction.class).asEagerSingleton();
bind(RestSearchScrollAction.class).asEagerSingleton();
@ -154,7 +153,5 @@ public class RestActionModule extends AbstractModule {
bind(RestValidateQueryAction.class).asEagerSingleton();
bind(RestMoreLikeThisAction.class).asEagerSingleton();
bind(RestPercolateAction.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.update;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.CREATED;
import static org.elasticsearch.rest.RestStatus.OK;
/**
*/
public class RestUpdateAction extends BaseRestHandler {
@Inject
public RestUpdateAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(POST, "/{index}/{type}/{id}/_update", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
UpdateRequest updateRequest = new UpdateRequest(request.param("index"), request.param("type"), request.param("id"));
updateRequest.routing(request.param("routing"));
updateRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout()));
String replicationType = request.param("replication");
if (replicationType != null) {
updateRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
// we just send a response, no need to fork
updateRequest.listenerThreaded(false);
updateRequest.script(request.param("script"));
updateRequest.scriptLang(request.param("lang"));
for (Map.Entry<String, String> entry : request.params().entrySet()) {
if (entry.getKey().startsWith("sp_")) {
updateRequest.addScriptParam(entry.getKey().substring(3), entry.getValue());
}
}
updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict()));
// see if we have it in the body
if (request.hasContent()) {
XContentType xContentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
if (xContentType != null) {
try {
Map<String, Object> content = XContentFactory.xContent(xContentType)
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose();
if (content.containsKey("script")) {
updateRequest.script(content.get("script").toString());
}
if (content.containsKey("lang")) {
updateRequest.scriptLang(content.get("lang").toString());
}
if (content.containsKey("params")) {
updateRequest.scriptParams((Map<String, Object>) content.get("params"));
}
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
}
return;
}
}
}
client.update(updateRequest, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject()
.field(Fields.OK, true)
.field(Fields._INDEX, response.index())
.field(Fields._TYPE, response.type())
.field(Fields._ID, response.id())
.field(Fields._VERSION, response.version());
builder.endObject();
RestStatus status = OK;
if (response.version() == 1) {
status = CREATED;
}
channel.sendResponse(new XContentRestResponse(request, status, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
static final class Fields {
static final XContentBuilderString OK = new XContentBuilderString("ok");
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
}
}

View File

@ -76,7 +76,7 @@ public class SourceLookup implements Map {
}
public static Map<String, Object> sourceAsMap(byte[] bytes, int offset, int length) throws ElasticSearchParseException {
return XContentHelper.convertToMap(bytes, offset, length).v2();
return XContentHelper.convertToMap(bytes, offset, length, false).v2();
}
public void setNextReader(IndexReader reader) {

View File

@ -1,6 +0,0 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1

View File

@ -22,6 +22,8 @@ package org.elasticsearch.test.integration.client.transport;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.document.DocumentActionsTests;
@ -38,6 +40,7 @@ public class TransportClientDocumentActionsTests extends DocumentActionsTests {
protected Client getClient1() {
TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder()
.put(nodeSettings())
.put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName())
.put("client.transport.sniff", false).build());
client.addTransportAddress(server1Address);
@ -48,9 +51,15 @@ public class TransportClientDocumentActionsTests extends DocumentActionsTests {
protected Client getClient2() {
TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder()
.put(nodeSettings())
.put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName())
.put("client.transport.sniff", false).build());
client.addTransportAddress(server2Address);
return client;
}
@Override
protected Settings nodeSettings() {
return ImmutableSettings.settingsBuilder().put("client.transport.nodes_sampler_interval", "30s").build();
}
}

View File

@ -1,12 +0,0 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1
# use large interval node sampler
client:
transport:
nodes_sampler_interval: 30s

View File

@ -1,56 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.client.transport;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.document.MoreLikeThisActionTests;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
/**
*
*/
public class TransportClientMoreLikeThisActionTests extends MoreLikeThisActionTests {
@Override
protected Client getClient1() {
TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder()
.put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName())
.put("discovery.enabled", false).build());
client.addTransportAddress(server1Address);
return client;
}
@Override
protected Client getClient2() {
TransportAddress server1Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder()
.put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName())
.put("discovery.enabled", false).build());
client.addTransportAddress(server1Address);
return client;
}
}

View File

@ -1,6 +0,0 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1

View File

@ -22,6 +22,8 @@ package org.elasticsearch.test.integration.client.transport;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.document.DocumentActionsTests;
@ -38,6 +40,7 @@ public class TransportClientSniffDocumentActionsTests extends DocumentActionsTes
protected Client getClient1() {
TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder()
.put(nodeSettings())
.put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName())
.put("client.transport.sniff", true).build());
client.addTransportAddress(server1Address);
@ -48,9 +51,15 @@ public class TransportClientSniffDocumentActionsTests extends DocumentActionsTes
protected Client getClient2() {
TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder()
.put(nodeSettings())
.put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName())
.put("client.transport.sniff", true).build());
client.addTransportAddress(server2Address);
return client;
}
@Override
protected Settings nodeSettings() {
return ImmutableSettings.settingsBuilder().put("client.transport.nodes_sampler_interval", "30s").build();
}
}

View File

@ -1,12 +0,0 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1
# use large interval node sampler
client:
transport:
nodes_sampler_interval: 30s

View File

@ -19,6 +19,9 @@
package org.elasticsearch.test.integration.document;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
@ -42,4 +45,9 @@ public class AliasedIndexDocumentActionsTests extends DocumentActionsTests {
protected String getConcreteIndexName() {
return "test1";
}
@Override
protected Settings nodeSettings() {
return ImmutableSettings.settingsBuilder().put("action.auto_create_index", false).build();
}
}

View File

@ -1,8 +0,0 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1
action:
auto_create_index: false

View File

@ -34,10 +34,14 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.DocumentMissingEngineException;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@ -62,8 +66,8 @@ public class DocumentActionsTests extends AbstractNodesTests {
@BeforeClass
public void startNodes() {
startNode("server1");
startNode("server2");
startNode("server1", nodeSettings());
startNode("server2", nodeSettings());
client1 = getClient1();
client2 = getClient2();
@ -87,6 +91,10 @@ public class DocumentActionsTests extends AbstractNodesTests {
client1.admin().indices().create(createIndexRequest("test")).actionGet();
}
protected Settings nodeSettings() {
return ImmutableSettings.Builder.EMPTY_SETTINGS;
}
protected String getConcreteIndexName() {
return "test";
}
@ -261,6 +269,57 @@ public class DocumentActionsTests extends AbstractNodesTests {
}
}
@Test
public void testUpdate() throws Exception {
createIndex();
ClusterHealthResponse clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
try {
client1.prepareUpdate("test", "type1", "1").setScript("ctx._source.field++").execute().actionGet();
assert false;
} catch (DocumentMissingEngineException e) {
// all is well
}
client1.prepareIndex("test", "type1", "1").setSource("field", 1).execute().actionGet();
UpdateResponse updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx._source.field += 1").execute().actionGet();
assertThat(updateResponse.version(), equalTo(2L));
for (int i = 0; i < 5; i++) {
GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("2"));
}
updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx._source.field += count").addScriptParam("count", 3).execute().actionGet();
assertThat(updateResponse.version(), equalTo(3L));
for (int i = 0; i < 5; i++) {
GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("5"));
}
// check noop
updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx.op = 'none'").execute().actionGet();
assertThat(updateResponse.version(), equalTo(3L));
for (int i = 0; i < 5; i++) {
GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("5"));
}
// check delete
updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx.op = 'delete'").execute().actionGet();
assertThat(updateResponse.version(), equalTo(4L));
for (int i = 0; i < 5; i++) {
GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.exists(), equalTo(false));
}
}
@Test
public void testBulk() throws Exception {
createIndex();

View File

@ -1,6 +0,0 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1

View File

@ -19,8 +19,16 @@
package org.elasticsearch.test.integration.document;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public class LocalDocumentActionsTests extends DocumentActionsTests {
@Override
protected Settings nodeSettings() {
return ImmutableSettings.settingsBuilder().put("node.local", true).build();
}
}

View File

@ -1,8 +0,0 @@
node:
local: true
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1

View File

@ -1,6 +0,0 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.test.integration.document;
package org.elasticsearch.test.integration.get;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.test.integration.document;
package org.elasticsearch.test.integration.mlt;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;