Add `flush` method for BulkProcessor class

There is no explicit method `flush/execute` in `BulkProcessor` class. This can be useful in certain scenarios.
Currently it requires to close and create a new BulkProcessor if one wants an immediate flush.

Closes #5575.
Closes #5570.
This commit is contained in:
kul 2014-03-27 16:55:26 +05:30 committed by David Pilato
parent 0b449d3040
commit dc19e06e27
2 changed files with 59 additions and 11 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import java.io.Closeable;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -43,7 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
* <p/> * <p/>
* In order to create a new bulk processor, use the {@link Builder}. * In order to create a new bulk processor, use the {@link Builder}.
*/ */
public class BulkProcessor { public class BulkProcessor implements Closeable {
/** /**
* A listener for the execution. * A listener for the execution.
@ -191,6 +192,7 @@ public class BulkProcessor {
} }
} }
@Override
/** /**
* Closes the processor. If flushing by time is enabled, then its shutdown. Any remaining bulk actions are flushed. * Closes the processor. If flushing by time is enabled, then its shutdown. Any remaining bulk actions are flushed.
*/ */
@ -235,7 +237,14 @@ public class BulkProcessor {
return this; return this;
} }
public void ensureOpen() {
if (closed) {
throw new ElasticsearchIllegalStateException("bulk process already closed");
}
}
private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) { private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) {
ensureOpen();
bulkRequest.add(request, payload); bulkRequest.add(request, payload);
executeIfNeeded(); executeIfNeeded();
} }
@ -251,9 +260,7 @@ public class BulkProcessor {
} }
private void executeIfNeeded() { private void executeIfNeeded() {
if (closed) { ensureOpen();
throw new ElasticsearchIllegalStateException("bulk process already closed");
}
if (!isOverTheLimit()) { if (!isOverTheLimit()) {
return; return;
} }
@ -322,6 +329,16 @@ public class BulkProcessor {
return false; return false;
} }
/**
* Flush pending delete or index requests.
*/
public synchronized void flush() {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
}
class Flush implements Runnable { class Flush implements Runnable {
@Override @Override

View File

@ -587,9 +587,8 @@ public class BulkTests extends ElasticsearchIntegrationTest {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
}; };
BulkProcessor processor = null; try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(5)
try { .setConcurrentRequests(1).setName("foo").build()) {
processor = BulkProcessor.builder(client(), listener).setBulkActions(5).setConcurrentRequests(1).setName("foo").build();
Map<String, Object> data = Maps.newHashMap(); Map<String, Object> data = Maps.newHashMap();
data.put("foo", "bar"); data.put("foo", "bar");
@ -602,10 +601,6 @@ public class BulkTests extends ElasticsearchIntegrationTest {
BulkResponse response = responseQueue.poll(5, TimeUnit.SECONDS); BulkResponse response = responseQueue.poll(5, TimeUnit.SECONDS);
assertThat("Could not get a bulk response in 5 seconds", response, is(notNullValue())); assertThat("Could not get a bulk response in 5 seconds", response, is(notNullValue()));
assertThat(response.getItems().length, is(5)); assertThat(response.getItems().length, is(5));
} finally {
if (processor != null) {
processor.close();
}
} }
} }
@ -626,4 +621,40 @@ public class BulkTests extends ElasticsearchIntegrationTest {
assertThat(bulkResponse.getItems()[i].isFailed(), is(expectedFailures[i])); assertThat(bulkResponse.getItems()[i].isFailed(), is(expectedFailures[i]));
} }
} }
@Test
public void testBulkProcessorFlush() throws InterruptedException {
final BlockingQueue<BulkResponse> responseQueue = new SynchronousQueue();
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
responseQueue.add(response);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};
try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(6)
.setConcurrentRequests(1).setName("foo").build()) {
Map<String, Object> data = Maps.newHashMap();
data.put("foo", "bar");
processor.add(new IndexRequest("test", "test", "1").source(data));
processor.add(new IndexRequest("test", "test", "2").source(data));
processor.add(new IndexRequest("test", "test", "3").source(data));
processor.add(new IndexRequest("test", "test", "4").source(data));
processor.add(new IndexRequest("test", "test", "5").source(data));
processor.flush();
BulkResponse response = responseQueue.poll(1, TimeUnit.SECONDS);
assertThat("Could not get a bulk response even after an explicit flush.", response, is(notNullValue()));
assertThat(response.getItems().length, is(5));
}
}
} }