Added more trace logging.
Enabled trace logging for RecoveryPercolatorTests#testMultiPercolator_recovery test Cleaned up and removed unnecessary usage of concurrent collections.
This commit is contained in:
parent
0b79ba9493
commit
1d8457394d
|
@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.ShardIterator;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.percolator.PercolatorService;
|
||||
|
@ -43,9 +42,7 @@ import org.elasticsearch.transport.TransportChannel;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
|
@ -76,22 +73,20 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
final ClusterState clusterState = clusterService.state();
|
||||
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
|
||||
|
||||
final AtomicReferenceArray<Object> percolateRequests = new AtomicReferenceArray<Object>(request.requests().size());
|
||||
TIntArrayList getRequestSlots = new TIntArrayList();
|
||||
List<GetRequest> existingDocsRequests = new ArrayList<GetRequest>();
|
||||
for (int i = 0; i < request.requests().size(); i++) {
|
||||
PercolateRequest percolateRequest = request.requests().get(i);
|
||||
percolateRequest.startTime = System.currentTimeMillis();
|
||||
percolateRequests.set(i, percolateRequest);
|
||||
if (percolateRequest.getRequest() != null) {
|
||||
existingDocsRequests.add(percolateRequest.getRequest());
|
||||
getRequestSlots.add(i);
|
||||
}
|
||||
}
|
||||
|
||||
final List<Object> percolateRequests = new ArrayList<Object>(request.requests().size());
|
||||
// Can have a mixture of percolate requests. (normal percolate requests & percolate existing doc),
|
||||
// so we need to keep track for what percolate request we had a get request
|
||||
final AtomicIntegerArray percolateRequestSlotsWithGet = new AtomicIntegerArray(getRequestSlots.toArray());
|
||||
final TIntArrayList getRequestSlots = new TIntArrayList();
|
||||
List<GetRequest> existingDocsRequests = new ArrayList<GetRequest>();
|
||||
for (int slot = 0; slot < request.requests().size(); slot++) {
|
||||
PercolateRequest percolateRequest = request.requests().get(slot);
|
||||
percolateRequest.startTime = System.currentTimeMillis();
|
||||
percolateRequests.add(percolateRequest);
|
||||
if (percolateRequest.getRequest() != null) {
|
||||
existingDocsRequests.add(percolateRequest.getRequest());
|
||||
getRequestSlots.add(slot);
|
||||
}
|
||||
}
|
||||
|
||||
if (!existingDocsRequests.isEmpty()) {
|
||||
final MultiGetRequest multiGetRequest = new MultiGetRequest();
|
||||
|
@ -108,16 +103,18 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
public void onResponse(MultiGetResponse multiGetItemResponses) {
|
||||
for (int i = 0; i < multiGetItemResponses.getResponses().length; i++) {
|
||||
MultiGetItemResponse itemResponse = multiGetItemResponses.getResponses()[i];
|
||||
int slot = percolateRequestSlotsWithGet.get(i);
|
||||
int slot = getRequestSlots.get(i);
|
||||
if (!itemResponse.isFailed()) {
|
||||
GetResponse getResponse = itemResponse.getResponse();
|
||||
if (getResponse.isExists()) {
|
||||
PercolateRequest originalRequest = (PercolateRequest) percolateRequests.get(slot);
|
||||
percolateRequests.set(slot, new PercolateRequest(originalRequest, getResponse.getSourceAsBytesRef()));
|
||||
} else {
|
||||
logger.trace("mpercolate existing doc, item[{}] doesn't exist", slot);
|
||||
percolateRequests.set(slot, new DocumentMissingException(null, getResponse.getType(), getResponse.getId()));
|
||||
}
|
||||
} else {
|
||||
logger.trace("mpercolate existing doc, item[{}] failure {}", slot, itemResponse.getFailure());
|
||||
percolateRequests.set(slot, itemResponse.getFailure());
|
||||
}
|
||||
}
|
||||
|
@ -138,29 +135,29 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
|
||||
private class ASyncAction {
|
||||
|
||||
final ActionListener<MultiPercolateResponse> listener;
|
||||
final ActionListener<MultiPercolateResponse> finalListener;
|
||||
final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard;
|
||||
final AtomicReferenceArray<Object> percolateRequests;
|
||||
final List<Object> percolateRequests;
|
||||
|
||||
final Map<ShardId, TIntArrayList> shardToSlots;
|
||||
final AtomicInteger expectedOperations;
|
||||
final AtomicArray<Object> reducedResponses;
|
||||
final ConcurrentMap<ShardId, AtomicIntegerArray> shardToSlots;
|
||||
final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem;
|
||||
final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard;
|
||||
|
||||
ASyncAction(AtomicReferenceArray<Object> percolateRequests, ActionListener<MultiPercolateResponse> listener, ClusterState clusterState) {
|
||||
this.listener = listener;
|
||||
ASyncAction(List<Object> percolateRequests, ActionListener<MultiPercolateResponse> finalListener, ClusterState clusterState) {
|
||||
this.finalListener = finalListener;
|
||||
this.percolateRequests = percolateRequests;
|
||||
responsesByItemAndShard = new AtomicReferenceArray<AtomicReferenceArray>(percolateRequests.length());
|
||||
expectedOperationsPerItem = new AtomicReferenceArray<AtomicInteger>(percolateRequests.length());
|
||||
reducedResponses = new AtomicArray<Object>(percolateRequests.length());
|
||||
responsesByItemAndShard = new AtomicReferenceArray<AtomicReferenceArray>(percolateRequests.size());
|
||||
expectedOperationsPerItem = new AtomicReferenceArray<AtomicInteger>(percolateRequests.size());
|
||||
reducedResponses = new AtomicArray<Object>(percolateRequests.size());
|
||||
|
||||
// Resolving concrete indices and routing and grouping the requests by shard
|
||||
requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
|
||||
// Keep track what slots belong to what shard, in case a request to a shard fails on all copies
|
||||
Map<ShardId, TIntArrayList> shardToSlotsBuilder = new HashMap<ShardId, TIntArrayList>();
|
||||
shardToSlots = new HashMap<ShardId, TIntArrayList>();
|
||||
int expectedResults = 0;
|
||||
for (int slot = 0; slot < percolateRequests.length(); slot++) {
|
||||
for (int slot = 0; slot < percolateRequests.size(); slot++) {
|
||||
Object element = percolateRequests.get(slot);
|
||||
assert element != null;
|
||||
if (element instanceof PercolateRequest) {
|
||||
|
@ -169,7 +166,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
Map<String, Set<String>> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), percolateRequest.indices());
|
||||
// TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction
|
||||
GroupShardsIterator shards = clusterService.operationRouting().searchShards(
|
||||
clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference()
|
||||
clusterState, percolateRequest.indices(), concreteIndices, routing, null
|
||||
);
|
||||
|
||||
responsesByItemAndShard.set(slot, new AtomicReferenceArray(shards.size()));
|
||||
|
@ -180,27 +177,24 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
if (requests == null) {
|
||||
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference()));
|
||||
}
|
||||
logger.trace("Adding shard[{}] percolate request for item[{}]", shardId, slot);
|
||||
requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));
|
||||
|
||||
TIntArrayList items = shardToSlotsBuilder.get(shardId);
|
||||
TIntArrayList items = shardToSlots.get(shardId);
|
||||
if (items == null) {
|
||||
shardToSlotsBuilder.put(shardId, items = new TIntArrayList());
|
||||
shardToSlots.put(shardId, items = new TIntArrayList());
|
||||
}
|
||||
items.add(slot);
|
||||
}
|
||||
expectedResults++;
|
||||
} else if (element instanceof Throwable || element instanceof MultiGetResponse.Failure) {
|
||||
logger.trace("item[{}] won't be executed, reason: {}", slot, element);
|
||||
reducedResponses.set(slot, element);
|
||||
responsesByItemAndShard.set(slot, new AtomicReferenceArray(0));
|
||||
expectedOperationsPerItem.set(slot, new AtomicInteger(0));
|
||||
}
|
||||
}
|
||||
expectedOperations = new AtomicInteger(expectedResults);
|
||||
// Move slot to shard tracking from normal map to concurrent save map
|
||||
shardToSlots = ConcurrentCollections.newConcurrentMap();
|
||||
for (Map.Entry<ShardId, TIntArrayList> entry : shardToSlotsBuilder.entrySet()) {
|
||||
shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray()));
|
||||
}
|
||||
}
|
||||
|
||||
void run() {
|
||||
|
@ -209,21 +203,20 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
return;
|
||||
}
|
||||
|
||||
logger.trace("mpercolate executing for shards {}", requestsByShard.keySet());
|
||||
for (Map.Entry<ShardId, TransportShardMultiPercolateAction.Request> entry : requestsByShard.entrySet()) {
|
||||
final int shardId = entry.getKey().id();
|
||||
final String index = entry.getKey().index().name();
|
||||
|
||||
final ShardId shardId = entry.getKey();
|
||||
TransportShardMultiPercolateAction.Request shardRequest = entry.getValue();
|
||||
shardMultiPercolateAction.execute(shardRequest, new ActionListener<TransportShardMultiPercolateAction.Response>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(TransportShardMultiPercolateAction.Response response) {
|
||||
onShardResponse(new ShardId(index, shardId), response);
|
||||
onShardResponse(shardId, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
onShardFailure(new ShardId(index, shardId), e);
|
||||
onShardFailure(shardId, e);
|
||||
}
|
||||
|
||||
});
|
||||
|
@ -232,6 +225,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
void onShardResponse(ShardId shardId, TransportShardMultiPercolateAction.Response response) {
|
||||
logger.debug("{} Percolate shard response", shardId);
|
||||
try {
|
||||
for (TransportShardMultiPercolateAction.Response.Item item : response.items()) {
|
||||
AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot());
|
||||
|
@ -255,16 +249,16 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
}
|
||||
} catch (Throwable e) {
|
||||
logger.error("{} Percolate original reduce error", e, shardId);
|
||||
listener.onFailure(e);
|
||||
finalListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void onShardFailure(ShardId shardId, Throwable e) {
|
||||
logger.debug("Shard multi percolate failure", e);
|
||||
logger.debug("{} Shard multi percolate failure", e, shardId);
|
||||
try {
|
||||
AtomicIntegerArray slots = shardToSlots.get(shardId);
|
||||
for (int i = 0; i < slots.length(); i++) {
|
||||
TIntArrayList slots = shardToSlots.get(shardId);
|
||||
for (int i = 0; i < slots.size(); i++) {
|
||||
int slot = slots.get(i);
|
||||
AtomicReferenceArray shardResults = responsesByItemAndShard.get(slot);
|
||||
if (shardResults == null) {
|
||||
|
@ -279,7 +273,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
}
|
||||
} catch (Throwable t) {
|
||||
logger.error("{} Percolate original reduce error, original error {}", t, shardId, e);
|
||||
listener.onFailure(t);
|
||||
finalListener.onFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,7 +300,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
|
|||
finalResponse[slot] = new MultiPercolateResponse.Item(((MultiGetResponse.Failure)element).getMessage());
|
||||
}
|
||||
}
|
||||
listener.onResponse(new MultiPercolateResponse(finalResponse));
|
||||
finalListener.onResponse(new MultiPercolateResponse(finalResponse));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -123,8 +123,6 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
|
|||
private String preference;
|
||||
private List<Item> items;
|
||||
|
||||
private volatile boolean done = false;
|
||||
|
||||
public Request() {
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.node.internal.InternalNode;
|
||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||
import org.junit.After;
|
||||
|
@ -272,6 +273,8 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
// Need to omit org.elast
|
||||
@TestLogging("action.percolate:TRACE")
|
||||
public void testMultiPercolator_recovery() throws Exception {
|
||||
percolatorRecovery(true);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue