Make listeners a final field in ShardIndexingService, which fixed possible visibility issue.

This commit is contained in:
Martijn van Groningen 2013-12-11 12:22:45 +01:00
parent aee388ec46
commit a760f1f54a
1 changed files with 44 additions and 75 deletions

View File

@ -44,9 +44,9 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
private final StatsHolder totalStats = new StatsHolder(); private final StatsHolder totalStats = new StatsHolder();
private volatile Map<String, StatsHolder> typesStats = ImmutableMap.of(); private final CopyOnWriteArrayList<IndexingOperationListener> listeners = new CopyOnWriteArrayList<IndexingOperationListener>();
private CopyOnWriteArrayList<IndexingOperationListener> listeners = null; private volatile Map<String, StatsHolder> typesStats = ImmutableMap.of();
@Inject @Inject
public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, ShardSlowLogIndexingService slowLog) { public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, ShardSlowLogIndexingService slowLog) {
@ -81,42 +81,29 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return new IndexingStats(total, typesSt); return new IndexingStats(total, typesSt);
} }
public synchronized void addListener(IndexingOperationListener listener) { public void addListener(IndexingOperationListener listener) {
if (listeners == null) {
listeners = new CopyOnWriteArrayList<IndexingOperationListener>();
}
listeners.add(listener); listeners.add(listener);
} }
public synchronized void removeListener(IndexingOperationListener listener) { public void removeListener(IndexingOperationListener listener) {
if (listeners == null) {
return;
}
listeners.remove(listener); listeners.remove(listener);
if (listeners.isEmpty()) {
listeners = null;
}
} }
public Engine.Create preCreate(Engine.Create create) { public Engine.Create preCreate(Engine.Create create) {
totalStats.indexCurrent.inc(); totalStats.indexCurrent.inc();
typeStats(create.type()).indexCurrent.inc(); typeStats(create.type()).indexCurrent.inc();
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { create = listener.preCreate(create);
create = listener.preCreate(create);
}
} }
return create; return create;
} }
public void postCreateUnderLock(Engine.Create create) { public void postCreateUnderLock(Engine.Create create) {
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { try {
try { listener.postCreateUnderLock(create);
listener.postCreateUnderLock(create); } catch (Exception e) {
} catch (Exception e) { logger.warn("post listener [{}] failed", e, listener);
logger.warn("post listener [{}] failed", e, listener);
}
} }
} }
} }
@ -129,13 +116,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats.indexMetric.inc(took); typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec(); typeStats.indexCurrent.dec();
slowLog.postCreate(create, took); slowLog.postCreate(create, took);
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { try {
try { listener.postCreate(create);
listener.postCreate(create); } catch (Exception e) {
} catch (Exception e) { logger.warn("post listener [{}] failed", e, listener);
logger.warn("post listener [{}] failed", e, listener);
}
} }
} }
} }
@ -143,22 +128,18 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public Engine.Index preIndex(Engine.Index index) { public Engine.Index preIndex(Engine.Index index) {
totalStats.indexCurrent.inc(); totalStats.indexCurrent.inc();
typeStats(index.type()).indexCurrent.inc(); typeStats(index.type()).indexCurrent.inc();
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { index = listener.preIndex(index);
index = listener.preIndex(index);
}
} }
return index; return index;
} }
public void postIndexUnderLock(Engine.Index index) { public void postIndexUnderLock(Engine.Index index) {
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { try {
try { listener.postIndexUnderLock(index);
listener.postIndexUnderLock(index); } catch (Exception e) {
} catch (Exception e) { logger.warn("post listener [{}] failed", e, listener);
logger.warn("post listener [{}] failed", e, listener);
}
} }
} }
} }
@ -171,13 +152,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats.indexMetric.inc(took); typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec(); typeStats.indexCurrent.dec();
slowLog.postIndex(index, took); slowLog.postIndex(index, took);
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { try {
try { listener.postIndex(index);
listener.postIndex(index); } catch (Exception e) {
} catch (Exception e) { logger.warn("post listener [{}] failed", e, listener);
logger.warn("post listener [{}] failed", e, listener);
}
} }
} }
} }
@ -190,22 +169,18 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public Engine.Delete preDelete(Engine.Delete delete) { public Engine.Delete preDelete(Engine.Delete delete) {
totalStats.deleteCurrent.inc(); totalStats.deleteCurrent.inc();
typeStats(delete.type()).deleteCurrent.inc(); typeStats(delete.type()).deleteCurrent.inc();
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { delete = listener.preDelete(delete);
delete = listener.preDelete(delete);
}
} }
return delete; return delete;
} }
public void postDeleteUnderLock(Engine.Delete delete) { public void postDeleteUnderLock(Engine.Delete delete) {
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { try {
try { listener.postDeleteUnderLock(delete);
listener.postDeleteUnderLock(delete); } catch (Exception e) {
} catch (Exception e) { logger.warn("post listener [{}] failed", e, listener);
logger.warn("post listener [{}] failed", e, listener);
}
} }
} }
} }
@ -217,13 +192,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
StatsHolder typeStats = typeStats(delete.type()); StatsHolder typeStats = typeStats(delete.type());
typeStats.deleteMetric.inc(took); typeStats.deleteMetric.inc(took);
typeStats.deleteCurrent.dec(); typeStats.deleteCurrent.dec();
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { try {
try { listener.postDelete(delete);
listener.postDelete(delete); } catch (Exception e) {
} catch (Exception e) { logger.warn("post listener [{}] failed", e, listener);
logger.warn("post listener [{}] failed", e, listener);
}
} }
} }
} }
@ -234,19 +207,15 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
} }
public Engine.DeleteByQuery preDeleteByQuery(Engine.DeleteByQuery deleteByQuery) { public Engine.DeleteByQuery preDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { deleteByQuery = listener.preDeleteByQuery(deleteByQuery);
deleteByQuery = listener.preDeleteByQuery(deleteByQuery);
}
} }
return deleteByQuery; return deleteByQuery;
} }
public void postDeleteByQuery(Engine.DeleteByQuery deleteByQuery) { public void postDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
if (listeners != null) { for (IndexingOperationListener listener : listeners) {
for (IndexingOperationListener listener : listeners) { listener.postDeleteByQuery(deleteByQuery);
listener.postDeleteByQuery(deleteByQuery);
}
} }
} }