[RECOVERY] Release store lock before blocking on mapping updates

This can lead to sporadic shard creating timeouts if the same shard is
created, closed and created again on the same node. The reason for this is
that we holding on to the store reference while blocking on the mapping update
that will prevent the shard lock from being released. Holding the lock is unnecessary
in this case and can simply be removed.
This commit is contained in:
Simon Willnauer 2014-12-30 17:11:06 +01:00
parent 707025fb7a
commit 54ce210c8e
1 changed files with 24 additions and 26 deletions

View File

@ -103,6 +103,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
recoveryState.setStage(RecoveryState.Stage.INDEX); recoveryState.setStage(RecoveryState.Stage.INDEX);
long version = -1; long version = -1;
long translogId = -1; long translogId = -1;
final Set<String> typesToUpdate = Sets.newHashSet();
indexShard.store().incRef(); indexShard.store().incRef();
try { try {
try { try {
@ -232,7 +233,6 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
StreamInput in = null; StreamInput in = null;
final Set<String> typesToUpdate = Sets.newHashSet();
try { try {
logger.trace("recovering translog file: {} length: {}", recoveringTranslogFile, Files.size(recoveringTranslogFile)); logger.trace("recovering translog file: {} length: {}", recoveringTranslogFile, Files.size(recoveringTranslogFile));
TranslogStream stream = TranslogStreams.translogStreamFor(recoveringTranslogFile); TranslogStream stream = TranslogStreams.translogStreamFor(recoveringTranslogFile);
@ -291,35 +291,33 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("Failed to delete recovering translog file {}", ex, recoveringTranslogFile); logger.debug("Failed to delete recovering translog file {}", ex, recoveringTranslogFile);
} }
for (final String type : typesToUpdate) {
final CountDownLatch latch = new CountDownLatch(1);
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.mapperService().documentMapper(type), indexService.indexUUID(), new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
latch.countDown();
logger.debug("failed to send mapping update post recovery to master for [{}]", t, type);
}
});
try {
boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS);
if (!waited) {
logger.debug("waited for mapping update on master for [{}], yet timed out");
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
}
}
} finally { } finally {
indexShard.store().decRef(); indexShard.store().decRef();
} }
for (final String type : typesToUpdate) {
final CountDownLatch latch = new CountDownLatch(1);
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.mapperService().documentMapper(type), indexService.indexUUID(), new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
latch.countDown();
logger.debug("failed to send mapping update post recovery to master for [{}]", t, type);
}
});
try {
boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS);
if (!waited) {
logger.debug("waited for mapping update on master for [{}], yet timed out");
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
}
}
recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime()); recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime());
} }