add some prepare APIs to the groovy APIs

This commit is contained in:
kimchy 2010-06-19 21:09:03 +03:00
parent 891c69f8de
commit 0f2147aeec
4 changed files with 244 additions and 11 deletions

View File

@ -32,19 +32,27 @@ import java.util.List;
*/
public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> {
private final boolean listenerThreaded;
final boolean listenerThreaded;
private final ThreadPool threadPool;
final ThreadPool threadPool;
private volatile Object listeners;
volatile Object listeners;
private boolean executedListeners = false;
boolean executedListeners = false;
protected AbstractListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) {
this.listenerThreaded = listenerThreaded;
this.threadPool = threadPool;
}
public boolean listenerThreaded() {
return listenerThreaded;
}
public ThreadPool threadPool() {
return threadPool;
}
public void addListener(final ActionListener<T> listener) {
internalAddListener(listener);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.groovy.client
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.ListenableActionFuture
import org.elasticsearch.action.count.CountRequest
import org.elasticsearch.action.count.CountResponse
import org.elasticsearch.action.delete.DeleteRequest
@ -36,6 +37,14 @@ import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.terms.TermsRequest
import org.elasticsearch.action.terms.TermsResponse
import org.elasticsearch.client.Client
import org.elasticsearch.client.action.count.CountRequestBuilder
import org.elasticsearch.client.action.delete.DeleteRequestBuilder
import org.elasticsearch.client.action.deletebyquery.DeleteByQueryRequestBuilder
import org.elasticsearch.client.action.get.GetRequestBuilder
import org.elasticsearch.client.action.index.IndexRequestBuilder
import org.elasticsearch.client.action.search.SearchRequestBuilder
import org.elasticsearch.client.action.support.BaseRequestBuilder
import org.elasticsearch.client.action.terms.TermsRequestBuilder
import org.elasticsearch.client.internal.InternalClient
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.groovy.client.action.GActionFuture
@ -47,12 +56,23 @@ import org.elasticsearch.groovy.util.xcontent.GXContentBuilder
class GClient {
static {
BaseRequestBuilder.metaClass.gexecute = {
ListenableActionFuture future = delegate.execute();
return new GActionFuture(future);
}
IndexRequest.metaClass.setSource = {Closure c ->
delegate.source(new GXContentBuilder().buildAsBytes(c, indexContentType))
}
IndexRequest.metaClass.source = {Closure c ->
delegate.source(new GXContentBuilder().buildAsBytes(c, indexContentType))
}
IndexRequestBuilder.metaClass.setSource = {Closure c ->
delegate.setSource(new GXContentBuilder().buildAsBytes(c, indexContentType))
}
IndexRequestBuilder.metaClass.source = {Closure c ->
delegate.setSource(new GXContentBuilder().buildAsBytes(c, indexContentType))
}
DeleteByQueryRequest.metaClass.setQuery = {Closure c ->
delegate.query(new GXContentBuilder().buildAsBytes(c, contentType))
@ -60,6 +80,12 @@ class GClient {
DeleteByQueryRequest.metaClass.query = {Closure c ->
delegate.query(new GXContentBuilder().buildAsBytes(c, contentType))
}
DeleteByQueryRequestBuilder.metaClass.setQuery = {Closure c ->
delegate.setQuery(new GXContentBuilder().buildAsBytes(c, contentType))
}
DeleteByQueryRequestBuilder.metaClass.query = {Closure c ->
delegate.setQuery(new GXContentBuilder().buildAsBytes(c, contentType))
}
CountRequest.metaClass.setQuery = {Closure c ->
delegate.query(new GXContentBuilder().buildAsBytes(c, contentType))
@ -67,6 +93,12 @@ class GClient {
CountRequest.metaClass.query = {Closure c ->
delegate.query(new GXContentBuilder().buildAsBytes(c, contentType))
}
CountRequestBuilder.metaClass.setQuery = {Closure c ->
delegate.setQuery(new GXContentBuilder().buildAsBytes(c, contentType))
}
CountRequestBuilder.metaClass.query = {Closure c ->
delegate.setQuery(new GXContentBuilder().buildAsBytes(c, contentType))
}
SearchRequest.metaClass.setSource = {Closure c ->
delegate.source(new GXContentBuilder().buildAsBytes(c, contentType))
@ -80,6 +112,18 @@ class GClient {
SearchRequest.metaClass.extraSource = {Closure c ->
delegate.extraSource(new GXContentBuilder().buildAsBytes(c, contentType))
}
SearchRequestBuilder.metaClass.setSource = {Closure c ->
delegate.setSource(new GXContentBuilder().buildAsBytes(c, contentType))
}
SearchRequestBuilder.metaClass.source = {Closure c ->
delegate.setSource(new GXContentBuilder().buildAsBytes(c, contentType))
}
SearchRequestBuilder.metaClass.setExtraSource = {Closure c ->
delegate.setExtraSource(new GXContentBuilder().buildAsBytes(c, contentType))
}
SearchRequestBuilder.metaClass.extraSource = {Closure c ->
delegate.setExtraSource(new GXContentBuilder().buildAsBytes(c, contentType))
}
MoreLikeThisRequest.metaClass.setSearchSource = {Closure c ->
delegate.searchSource(new GXContentBuilder().buildAsBytes(c, contentType))
@ -108,6 +152,14 @@ class GClient {
this.admin = new GAdminClient(this)
}
IndexRequestBuilder prepareIndex(String index, String type) {
return client.prepareIndex(index, type);
}
IndexRequestBuilder prepareIndex(String index, String type, String id) {
return client.prepareIndex(index, type, id);
}
GActionFuture<IndexResponse> index(Closure c) {
IndexRequest request = new IndexRequest()
c.setDelegate request
@ -126,6 +178,10 @@ class GClient {
client.index(request, listener)
}
GetRequestBuilder prepareGet(String index, String type, String id) {
return client.prepareGet(index, type, id);
}
GActionFuture<GetResponse> get(Closure c) {
GetRequest request = new GetRequest()
c.setDelegate request
@ -144,6 +200,10 @@ class GClient {
client.get(request, listener)
}
DeleteRequestBuilder prepareDelete(String index, String type, String id) {
return client.prepareDelete(index, type, id)
}
GActionFuture<DeleteResponse> delete(Closure c) {
DeleteRequest request = new DeleteRequest()
c.resolveStrategy = resolveStrategy
@ -162,6 +222,10 @@ class GClient {
client.delete(request, listener)
}
DeleteByQueryRequestBuilder prepareDeleteByQuery(String... indices) {
return client.prepareDeleteByQuery(indices);
}
GActionFuture<DeleteByQueryResponse> deleteByQuery(Closure c) {
DeleteByQueryRequest request = new DeleteByQueryRequest()
c.resolveStrategy = resolveStrategy
@ -180,6 +244,10 @@ class GClient {
client.deleteByQuery(request, listener)
}
CountRequestBuilder prepareCount(String... indices) {
return client.prepareCount(indices)
}
GActionFuture<CountResponse> count(Closure c) {
CountRequest request = new CountRequest()
c.resolveStrategy = resolveStrategy
@ -198,6 +266,10 @@ class GClient {
client.count(request, listener)
}
SearchRequestBuilder prepareSearch(String... indices) {
return client.prepareSearch(indices)
}
GActionFuture<SearchResponse> search(Closure c) {
SearchRequest request = new SearchRequest()
c.resolveStrategy = resolveStrategy
@ -216,6 +288,10 @@ class GClient {
client.search(request, listener)
}
TermsRequestBuilder prepareTerms(String... indices) {
return client.prepareTerms(indices)
}
GActionFuture<TermsResponse> terms(Closure c) {
TermsRequest request = new TermsRequest()
c.resolveStrategy = resolveStrategy

View File

@ -23,19 +23,28 @@ import groovy.lang.Closure;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author kimchy (shay.banon)
*/
public class GActionFuture<T> extends PlainListenableActionFuture<T> {
public class GActionFuture<T> implements ListenableActionFuture<T>, ActionListener<T> {
protected GActionFuture(ThreadPool threadPool, ActionRequest request) {
super(request.listenerThreaded(), threadPool);
private final PlainListenableActionFuture<T> future;
public GActionFuture(ListenableActionFuture<T> future) {
this.future = (PlainListenableActionFuture<T>) future;
}
public GActionFuture(ThreadPool threadPool, ActionRequest request) {
this.future = new PlainListenableActionFuture<T>(request.listenerThreaded(), threadPool);
}
public void setListener(final Closure listener) {
@ -79,18 +88,76 @@ public class GActionFuture<T> extends PlainListenableActionFuture<T> {
}
public T response(String timeout) throws ElasticSearchException {
return super.actionGet(timeout);
return actionGet(timeout);
}
public T response(long timeoutMillis) throws ElasticSearchException {
return super.actionGet(timeoutMillis);
return actionGet(timeoutMillis);
}
public T response(TimeValue timeout) throws ElasticSearchException {
return super.actionGet(timeout);
return actionGet(timeout);
}
public T response(long timeout, TimeUnit unit) throws ElasticSearchException {
return super.actionGet(timeout, unit);
return actionGet(timeout, unit);
}
@Override public void onResponse(T t) {
future.onResponse(t);
}
@Override public void onFailure(Throwable e) {
future.onFailure(e);
}
// delegate methods
public void addListener(ActionListener<T> tActionListener) {
future.addListener(tActionListener);
}
@Override public void addListener(Runnable listener) {
future.addListener(listener);
}
@Override public T actionGet() throws ElasticSearchException {
return future.actionGet();
}
@Override public T actionGet(String timeout) throws ElasticSearchException {
return future.actionGet(timeout);
}
@Override public T actionGet(long timeoutMillis) throws ElasticSearchException {
return future.actionGet(timeoutMillis);
}
@Override public T actionGet(long timeout, TimeUnit unit) throws ElasticSearchException {
return future.actionGet(timeout, unit);
}
@Override public T actionGet(TimeValue timeout) throws ElasticSearchException {
return future.actionGet(timeout);
}
@Override public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
@Override public boolean isCancelled() {
return future.isCancelled();
}
@Override public boolean isDone() {
return future.isDone();
}
@Override public T get() throws InterruptedException, ExecutionException {
return future.get();
}
@Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.test.client
import org.elasticsearch.groovy.node.GNode
import org.elasticsearch.groovy.node.GNodeBuilder
import org.testng.annotations.AfterMethod
import org.testng.annotations.BeforeMethod
import org.testng.annotations.Test
import static org.hamcrest.MatcherAssert.*
import static org.hamcrest.Matchers.*
/**
* @author kimchy (shay.banon)
*/
class BuilderActionsTests {
def GNode node
@BeforeMethod
protected void setUp() {
GNodeBuilder nodeBuilder = new GNodeBuilder()
nodeBuilder.settings {
node {
local = true
}
}
node = nodeBuilder.node()
}
@AfterMethod
protected void tearDown() {
node.close()
}
@Test
void testSimpleOperations() {
def indexR = node.client.prepareIndex("test", "type1", "1").setSource({
test = "value"
complex {
value1 = "value1"
value2 = "value2"
}
}).gexecute()
assertThat indexR.response.index, equalTo("test")
assertThat indexR.response.type, equalTo("type1")
assertThat indexR.response.id, equalTo("1")
def delete = node.client.prepareDelete("test", "type1", "1").gexecute()
assertThat delete.response.index, equalTo("test")
assertThat delete.response.type, equalTo("type1")
assertThat delete.response.id, equalTo("1")
def refresh = node.client.admin.indices.refresh {}
assertThat refresh.response.failedShards, equalTo(0)
def get = node.client.prepareGet("test", "type1", "1").gexecute()
assertThat get.response.exists, equalTo(false)
}
}