comments for SyncedFlushService

This commit is contained in:
Britta Weber 2015-05-17 22:12:31 +02:00
parent 9ca17f834b
commit 6ed472f873
1 changed files with 21 additions and 19 deletions

View File

@ -51,6 +51,7 @@ import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -110,7 +111,7 @@ public class SyncedFlushService extends AbstractComponent {
* Tries to flush all copies of a shard and write a sync id to it.
* After a synced flush two shard copies may only contain the same sync id if they contain the same documents.
* To ensure this, synced flush works in three steps:
* 1. Flush all shard copies and gather the commit points for each copy after the flush
* 1. Flush all shard copies and gather the commit ids for each copy after the flush
* 2. Ensure that there are no ongoing indexing operations on the primary
* 3. Perform an additional flush on each shard copy that writes the sync id
*
@ -143,7 +144,8 @@ public class SyncedFlushService extends AbstractComponent {
}
int inflight = getInflightOpsCount(shardId, state, shardRoutingTable);
if (inflight != 1) {
assert inflight >= -1;
if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0).
actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]"));
}
String syncId = Strings.base64UUID();
@ -213,6 +215,7 @@ public class SyncedFlushService extends AbstractComponent {
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for in flight operation check", shardId);
Thread.currentThread().interrupt();
}
final int count = result.get();
logger.trace("{} in flight operation count [{}]", shardId, count);
@ -220,26 +223,22 @@ public class SyncedFlushService extends AbstractComponent {
}
void sendSyncRequests(final String syncId, List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds, final ShardId shardId, final ActionListener<SyncedFlushResult> listener) {
final CountDown countDownLatch = new CountDown(shards.size());
void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds, final ShardId shardId, final ActionListener<SyncedFlushResult> listener) {
final CountDown countDown = new CountDown(shards.size());
final Map<ShardRouting, SyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) {
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("unknown node"));
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
continue;
}
final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
if (expectedCommitId == null) {
logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush"));
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
continue;
}
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
@ -255,18 +254,14 @@ public class SyncedFlushService extends AbstractComponent {
SyncedFlushResponse existing = results.put(shard, response);
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
}
@Override
public void handleException(TransportException exp) {
logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard);
results.put(shard, new SyncedFlushResponse(exp.getMessage()));
if (countDownLatch.countDown()) {
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
}
@Override
@ -278,12 +273,19 @@ public class SyncedFlushService extends AbstractComponent {
}
private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, ActionListener<SyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, SyncedFlushResponse> results) {
if (countDown.countDown()) {
assert results.size() == shards.size();
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
}
}
/**
* send presync requests to all started copies of the given shard
*/
Map<String, Engine.CommitId> sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId) {
final CountDownLatch countDownLatch = new CountDownLatch(shards.size());
final Map<String, Engine.CommitId> commitIds = ConcurrentCollections.newConcurrentMap();
final ConcurrentMap<String, Engine.CommitId> commitIds = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
logger.trace("{} sending pre-synced flush request to {}", shardId, shard);
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
@ -300,7 +302,7 @@ public class SyncedFlushService extends AbstractComponent {
@Override
public void handleResponse(PreSyncedFlushResponse response) {
Engine.CommitId existing = commitIds.put(node.id(), response.commitId());
Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId());
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
countDownLatch.countDown();
@ -324,6 +326,7 @@ public class SyncedFlushService extends AbstractComponent {
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for presync requests", shardId);
Thread.currentThread().interrupt();
}
return commitIds;
@ -488,7 +491,6 @@ public class SyncedFlushService extends AbstractComponent {
final static class PreSyncedFlushRequest extends TransportRequest {
private ShardId shardId;
PreSyncedFlushRequest() {
}