Make SyncedFlushService fully asynchronous

Some requests in the SyncedFlushService were sill blocking on network
calls which made calling this service error prone if done on a network
thread. This commit makes this service fully async based on ActionListener.
This commit is contained in:
Simon Willnauer 2015-05-19 12:01:31 +02:00
parent d817793f9b
commit 55f99dfce6
5 changed files with 97 additions and 83 deletions

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
@ -62,16 +63,9 @@ public class SyncedFlushService extends AbstractComponent {
public static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync";
public static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight";
public static final String SETTING_PRE_SYNC_TIMEOUT = "indices.flush.synced.presync_timeout";
public static final String SETTING_SYNC_TIMEOUT = "indices.flush.synced.sync_timeout";
public static final String SETTING_IN_FLIGHT_OPS_TIMEOUT = "indices.flush.synced.in_flight_ops_timeout";
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
private final TimeValue preSyncTimeout;
private final TimeValue syncTimeout;
private final TimeValue inflightOpsTimeout;
@Inject
public SyncedFlushService(Settings settings, IndicesService indicesService, ClusterService clusterService, TransportService transportService) {
@ -83,9 +77,6 @@ public class SyncedFlushService extends AbstractComponent {
transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreSyncedFlushRequest.class, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler());
transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, SyncedFlushRequest.class, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler());
transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest.class, ThreadPool.Names.SAME, new InFlightOpCountTransportHandler());
preSyncTimeout = settings.getAsTime(SETTING_PRE_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5));
syncTimeout = settings.getAsTime(SETTING_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5));
inflightOpsTimeout = settings.getAsTime(SETTING_IN_FLIGHT_OPS_TIMEOUT, TimeValue.timeValueMinutes(5));
indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override
public void onShardInactive(final IndexShard indexShard) {
@ -132,27 +123,49 @@ public class SyncedFlushService extends AbstractComponent {
* the replica if it contains the same changes that the primary contains.
*
* Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies.
*
* **/
public void attemptSyncedFlush(ShardId shardId, ActionListener<SyncedFlushResult> actionListener) {
**/
public void attemptSyncedFlush(final ShardId shardId, final ActionListener<SyncedFlushResult> actionListener) {
try {
final ClusterState state = clusterService.state();
final IndexShardRoutingTable shardRoutingTable = getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
Map<String, Engine.CommitId> commitIds = sendPreSyncRequests(activeShards, state, shardId);
final ActionListener<Map<String, Engine.CommitId>> commitIdsListener = new ActionListener<Map<String, Engine.CommitId>>() {
@Override
public void onResponse(final Map<String, Engine.CommitId> commitIds) {
if (commitIds.isEmpty()) {
actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync"));
}
int inflight = getInflightOpsCount(shardId, state, shardRoutingTable);
final ActionListener<InFlightOpsResponse> inflightOpsListener = new ActionListener<InFlightOpsResponse>() {
@Override
public void onResponse(InFlightOpsResponse response) {
final int inflight = response.opCount();
assert inflight >= -1;
if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0).
actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]"));
}
} else {
// 3. now send the sync request to all the shards
String syncId = Strings.base64UUID();
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener);
}
}
@Override
public void onFailure(Throwable e) {
actionListener.onFailure(e);
}
};
// 2. fetch in flight operations
getInflightOpsCount(shardId, state, shardRoutingTable, inflightOpsListener);
}
@Override
public void onFailure(Throwable e) {
actionListener.onFailure(e);
}
};
// 1. send pre-sync flushes to all replicas
sendPreSyncRequests(activeShards, state, shardId, commitIdsListener);
} catch (Throwable t) {
actionListener.onFailure(t);
}
@ -177,15 +190,14 @@ public class SyncedFlushService extends AbstractComponent {
/**
* returns the number of inflight operations on primary. -1 upon error.
*/
protected int getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable) {
protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener<InFlightOpsResponse> listener) {
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId());
if (primaryNode == null) {
logger.trace("{} failed to resolve node for primary shard {}, skipping sync", shardId, primaryShard);
return -1;
listener.onResponse(new InFlightOpsResponse(-1));
return;
}
final AtomicInteger result = new AtomicInteger(-1);
final CountDownLatch latch = new CountDownLatch(1);
logger.trace("{} retrieving in flight operation count", shardId);
transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
new BaseTransportResponseHandler<InFlightOpsResponse>() {
@ -196,15 +208,13 @@ public class SyncedFlushService extends AbstractComponent {
@Override
public void handleResponse(InFlightOpsResponse response) {
result.set(response.opCount());
latch.countDown();
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
logger.debug("{} unexpected error while retrieving inflight op count", shardId);
result.set(-1);
latch.countDown();
listener.onFailure(exp);
}
@Override
@ -212,17 +222,6 @@ public class SyncedFlushService extends AbstractComponent {
return ThreadPool.Names.SAME;
}
});
try {
if (latch.await(inflightOpsTimeout.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("{} in flight operation check timed out after [{}]", shardId, syncTimeout);
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for in flight operation check", shardId);
Thread.currentThread().interrupt();
}
final int count = result.get();
logger.trace("{} in flight operation count [{}]", shardId, count);
return count;
}
@ -286,15 +285,17 @@ public class SyncedFlushService extends AbstractComponent {
/**
* send presync requests to all started copies of the given shard
*/
Map<String, Engine.CommitId> sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId) {
final CountDownLatch countDownLatch = new CountDownLatch(shards.size());
void sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId, final ActionListener<Map<String, Engine.CommitId>> listener) {
final CountDown countDown = new CountDown(shards.size());
final ConcurrentMap<String, Engine.CommitId> commitIds = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
logger.trace("{} sending pre-synced flush request to {}", shardId, shard);
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) {
logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard);
countDownLatch.countDown();
if(countDown.countDown()) {
listener.onResponse(commitIds);
}
continue;
}
transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreSyncedFlushRequest(shard.shardId()), new BaseTransportResponseHandler<PreSyncedFlushResponse>() {
@ -308,13 +309,17 @@ public class SyncedFlushService extends AbstractComponent {
Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId());
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
countDownLatch.countDown();
if(countDown.countDown()) {
listener.onResponse(commitIds);
}
}
@Override
public void handleException(TransportException exp) {
logger.trace("{} error while performing pre synced flush on [{}], skipping", shardId, exp, shard);
countDownLatch.countDown();
if(countDown.countDown()) {
listener.onResponse(commitIds);
}
}
@Override
@ -323,16 +328,6 @@ public class SyncedFlushService extends AbstractComponent {
}
});
}
try {
if (countDownLatch.await(preSyncTimeout.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("{} waiting for pre sync flush requests timed out after [{}]. pending ops [{}]", shardId, preSyncTimeout, countDownLatch.getCount());
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for presync requests", shardId);
Thread.currentThread().interrupt();
}
return commitIds;
}
private PreSyncedFlushResponse performPreSyncedFlush(PreSyncedFlushRequest request) {

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -40,7 +39,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.SyncedFlushUtil;
import org.elasticsearch.indices.SyncedFlushUtil;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.Test;

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.SyncedFlushUtil;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.elasticsearch.test.SyncedFlushUtil;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import java.util.List;
@ -50,11 +49,11 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
client().prepareIndex("test", "test", "2").setSource("{}").get();
String syncId = Strings.base64UUID();
SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener();
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener);
listener.latch.await();
assertNull(listener.error);
@ -67,8 +66,8 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success());
assertEquals("pending operations", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason());
flushService.sendPreSyncRequests(activeShards, state, shardId); // pull another commit and make sure we can't seal with the old one
listener = new SyncedFlushUtil.SyncResultListener();
SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't seal with the old one
listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener);
listener.latch.await();
assertNull(listener.error);
@ -91,7 +90,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId();
SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener();
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(shardId, listener);
listener.latch.await();
assertNull(listener.error);
@ -114,7 +113,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final ShardId shardId = shard.shardId();
shard.incrementOperationCounter();
try {
SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener();
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(shardId, listener);
listener.latch.await();
assertNull(listener.error);
@ -135,7 +134,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
IndexShard shard = test.shard(0);
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener();
SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(new ShardId("test", 1), listener);
listener.latch.await();
assertNotNull(listener.error);
@ -145,14 +144,14 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final ShardId shardId = shard.shardId();
client().admin().indices().prepareClose("test").get();
listener = new SyncedFlushUtil.SyncResultListener();
listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(shardId, listener);
listener.latch.await();
assertNotNull(listener.error);
assertNull(listener.result);
assertEquals("closed", listener.error.getMessage());
listener = new SyncedFlushUtil.SyncResultListener();
listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(new ShardId("nosuchindex", 0), listener);
listener.latch.await();
assertNotNull(listener.error);
@ -172,14 +171,14 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
if (randomBoolean()) {
client().prepareIndex("test", "test", "2").setSource("{}").get();
}
client().admin().indices().prepareFlush("test").setForce(true).get();
String syncId = Strings.base64UUID();
final SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener();
final SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener);
listener.latch.await();
assertNull(listener.error);
@ -206,11 +205,11 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = flushService.sendPreSyncRequests(activeShards, state, shardId);
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
commitIds.clear(); // wipe it...
String syncId = Strings.base64UUID();
SyncedFlushUtil.SyncResultListener listener = new SyncedFlushUtil.SyncResultListener();
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener);
listener.latch.await();
assertNull(listener.error);

View File

@ -16,15 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
package org.elasticsearch.indices;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/** Utils for SyncedFlush */
public class SyncedFlushUtil {
@ -37,7 +42,7 @@ public class SyncedFlushUtil {
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
public static SyncedFlushService.SyncedFlushResult attemptSyncedFlush(SyncedFlushService service, ShardId shardId) {
SyncResultListener listener = new SyncResultListener();
LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new LatchedListener();
service.attemptSyncedFlush(shardId, listener);
try {
listener.latch.await();
@ -50,14 +55,14 @@ public class SyncedFlushUtil {
return listener.result;
}
public static final class SyncResultListener implements ActionListener<SyncedFlushService.SyncedFlushResult> {
public volatile SyncedFlushService.SyncedFlushResult result;
public static final class LatchedListener<T> implements ActionListener<T> {
public volatile T result;
public volatile Throwable error;
public final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onResponse(SyncedFlushService.SyncedFlushResult syncedFlushResult) {
result = syncedFlushResult;
public void onResponse(T result) {
this.result = result;
latch.countDown();
}
@ -68,4 +73,21 @@ public class SyncedFlushUtil {
}
}
/**
* Blocking version of {@link SyncedFlushService#sendPreSyncRequests(List, ClusterState, ShardId, ActionListener)}
*/
public static Map<String, Engine.CommitId> sendPreSyncRequests(SyncedFlushService service, List<ShardRouting> activeShards, ClusterState state, ShardId shardId) {
LatchedListener<Map<String, Engine.CommitId>> listener = new LatchedListener<>();
service.sendPreSyncRequests(activeShards, state, shardId, listener);
try {
listener.latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (listener.error != null) {
throw ExceptionsHelper.convertToElastic(listener.error);
}
return listener.result;
}
}