Watcher: Cleanup - Replace list of integers with bitset (elastic/elasticsearch#4324)
In the triggered watch store a list of integers was returned to keep track which watches had been successfully stored and thus should be executed. This means, that an arraylist, plus autoboxing/unboxing needs to be done for all the triggered watches. This data structure can easily be replaced with a BitSet, resulting in much less objects being created or parsed - also it's a bit faster. Original commit: elastic/x-pack-elasticsearch@e9fba67e34
This commit is contained in:
parent
5716a20fd3
commit
779eb44b66
|
@ -34,6 +34,7 @@ import org.joda.time.DateTime;
|
|||
|
||||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -187,16 +188,13 @@ public class ExecutionService extends AbstractComponent {
|
|||
|
||||
logger.debug("saving watch records [{}]", triggeredWatches.size());
|
||||
|
||||
triggeredWatchStore.putAll(triggeredWatches, new ActionListener<List<Integer>>() {
|
||||
triggeredWatchStore.putAll(triggeredWatches, new ActionListener<BitSet>() {
|
||||
@Override
|
||||
public void onResponse(List<Integer> successFullSlots) {
|
||||
for (Integer slot : successFullSlots) {
|
||||
TriggeredWatch triggeredWatch = triggeredWatches.get(slot);
|
||||
try {
|
||||
executeAsync(contexts.get(slot), triggeredWatch);
|
||||
} catch (Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to execute watch [{}]", triggeredWatch.id()), e);
|
||||
}
|
||||
public void onResponse(BitSet slots) {
|
||||
int slot = 0;
|
||||
while ((slot = slots.nextSetBit(slot)) != -1) {
|
||||
executeAsync(contexts.get(slot), triggeredWatches.get(slot));
|
||||
slot++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -236,9 +234,11 @@ public class ExecutionService extends AbstractComponent {
|
|||
return;
|
||||
}
|
||||
|
||||
List<Integer> slots = triggeredWatchStore.putAll(triggeredWatches);
|
||||
for (Integer slot : slots) {
|
||||
BitSet slots = triggeredWatchStore.putAll(triggeredWatches);
|
||||
int slot = 0;
|
||||
while ((slot = slots.nextSetBit(slot)) != -1) {
|
||||
executeAsync(contexts.get(slot), triggeredWatches.get(slot));
|
||||
slot++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -129,10 +130,10 @@ public class TriggeredWatchStore extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<List<Integer>> listener) throws Exception {
|
||||
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<BitSet> listener) throws Exception {
|
||||
|
||||
if (triggeredWatches.isEmpty()) {
|
||||
listener.onResponse(Collections.emptyList());
|
||||
listener.onResponse(new BitSet(0));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -140,7 +141,9 @@ public class TriggeredWatchStore extends AbstractComponent {
|
|||
put(triggeredWatches.get(0), new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean success) {
|
||||
listener.onResponse(Collections.singletonList(0));
|
||||
BitSet bitSet = new BitSet(1);
|
||||
bitSet.set(0);
|
||||
listener.onResponse(bitSet);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -163,15 +166,14 @@ public class TriggeredWatchStore extends AbstractComponent {
|
|||
client.bulk(request, new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
List<Integer> successFullSlots = new ArrayList<Integer>();
|
||||
BitSet successFullSlots = new BitSet(triggeredWatches.size());
|
||||
for (int i = 0; i < response.getItems().length; i++) {
|
||||
BulkItemResponse itemResponse = response.getItems()[i];
|
||||
if (itemResponse.isFailed()) {
|
||||
logger.error("could store triggered watch with id [{}], because failed [{}]", itemResponse.getId(),
|
||||
itemResponse.getFailureMessage());
|
||||
} else {
|
||||
IndexResponse indexResponse = itemResponse.getResponse();
|
||||
successFullSlots.add(i);
|
||||
successFullSlots.set(i);
|
||||
}
|
||||
}
|
||||
listener.onResponse(successFullSlots);
|
||||
|
@ -187,7 +189,7 @@ public class TriggeredWatchStore extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public List<Integer> putAll(final List<TriggeredWatch> triggeredWatches) throws Exception {
|
||||
public BitSet putAll(final List<TriggeredWatch> triggeredWatches) throws Exception {
|
||||
ensureStarted();
|
||||
try {
|
||||
BulkRequest request = new BulkRequest();
|
||||
|
@ -198,15 +200,14 @@ public class TriggeredWatchStore extends AbstractComponent {
|
|||
request.add(indexRequest);
|
||||
}
|
||||
BulkResponse response = client.bulk(request, (TimeValue) null);
|
||||
List<Integer> successFullSlots = new ArrayList<>();
|
||||
BitSet successFullSlots = new BitSet(triggeredWatches.size());
|
||||
for (int i = 0; i < response.getItems().length; i++) {
|
||||
BulkItemResponse itemResponse = response.getItems()[i];
|
||||
if (itemResponse.isFailed()) {
|
||||
logger.error("could store triggered watch with id [{}], because failed [{}]", itemResponse.getId(),
|
||||
itemResponse.getFailureMessage());
|
||||
} else {
|
||||
IndexResponse indexResponse = itemResponse.getResponse();
|
||||
successFullSlots.add(i);
|
||||
successFullSlots.set(i);
|
||||
}
|
||||
}
|
||||
return successFullSlots;
|
||||
|
|
Loading…
Reference in New Issue