clear listeners only if not null (can cause failure to properly close a shard), also, do the CLOSE check if to reschedule within the sync block

This commit is contained in:
kimchy 2011-03-28 17:20:07 +02:00
parent 953a99c75c
commit e0d8094f3d
2 changed files with 20 additions and 11 deletions

View File

@ -337,6 +337,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
// now we can close the translog service, we need to close it before the we close the shard // now we can close the translog service, we need to close it before the we close the shard
shardInjector.getInstance(TranslogService.class).close(); shardInjector.getInstance(TranslogService.class).close();
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close translog service", e);
// ignore // ignore
} }
@ -351,18 +352,21 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
try { try {
((InternalIndexShard) indexShard).close(reason); ((InternalIndexShard) indexShard).close(reason);
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close index shard", e);
// ignore // ignore
} }
} }
try { try {
shardInjector.getInstance(Engine.class).close(); shardInjector.getInstance(Engine.class).close();
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close engine", e);
// ignore // ignore
} }
try { try {
shardInjector.getInstance(MergePolicyProvider.class).close(delete); shardInjector.getInstance(MergePolicyProvider.class).close(delete);
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close merge policy provider", e);
// ignore // ignore
} }
@ -372,17 +376,20 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
shardInjector.getInstance(IndexShardGatewayService.class).snapshotOnClose(); shardInjector.getInstance(IndexShardGatewayService.class).snapshotOnClose();
} }
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to snapshot gateway on close", e);
// ignore // ignore
} }
try { try {
shardInjector.getInstance(IndexShardGatewayService.class).close(deleteGateway); shardInjector.getInstance(IndexShardGatewayService.class).close(deleteGateway);
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close index shard gateway", e);
// ignore // ignore
} }
try { try {
// now we can close the translog // now we can close the translog
shardInjector.getInstance(Translog.class).close(delete); shardInjector.getInstance(Translog.class).close(delete);
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close translog", e);
// ignore // ignore
} }

View File

@ -442,10 +442,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
public void close(String reason) { public void close(String reason) {
synchronized (mutex) {
if (listeners != null) {
listeners.clear(); listeners.clear();
}
listeners = null; listeners = null;
indexSettingsService.removeListener(applyRefreshSettings); indexSettingsService.removeListener(applyRefreshSettings);
synchronized (mutex) {
if (state != IndexShardState.CLOSED) { if (state != IndexShardState.CLOSED) {
if (refreshScheduledFuture != null) { if (refreshScheduledFuture != null) {
refreshScheduledFuture.cancel(true); refreshScheduledFuture.cancel(true);
@ -609,8 +611,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public void run() { @Override public void run() {
// we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule
if (!engine().refreshNeeded()) { if (!engine().refreshNeeded()) {
if (state != IndexShardState.CLOSED) {
synchronized (mutex) { synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this); refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this);
} }
} }
@ -637,8 +639,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to perform scheduled engine refresh", e); logger.warn("Failed to perform scheduled engine refresh", e);
} }
if (state != IndexShardState.CLOSED) {
synchronized (mutex) { synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, EngineRefresher.this); refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, EngineRefresher.this);
} }
} }
@ -650,8 +652,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private class EngineMerger implements Runnable { private class EngineMerger implements Runnable {
@Override public void run() { @Override public void run() {
if (!engine().possibleMergeNeeded()) { if (!engine().possibleMergeNeeded()) {
if (state != IndexShardState.CLOSED) {
synchronized (mutex) { synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this); mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
} }
} }
@ -678,8 +680,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to perform scheduled engine optimize/merge", e); logger.warn("Failed to perform scheduled engine optimize/merge", e);
} }
if (state != IndexShardState.CLOSED) {
synchronized (mutex) { synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this); mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this);
} }
} }