Invoke the DocumentType listeners *before* the mappers are updated. This fixes an issue where if two or more concurrent percolate index requests are processed, the first request would the `.percolator` type mapping, but the real time percolator listener wouldn't be active, this would result in that the subsequent concurrent requests wouldn't be parsed and kept in memory and would never be included in any percolate api result. This issue any occurred when `.percolator` type is created on the fly.

Also made the call to PercolatorQueriesRegistry#enableRealTimePercolator and #disableRealTimePercolator synchronized, so that for the same shard the RealTimePercolatorOperationListener can't registered twice.
This commit is contained in:
Martijn van Groningen 2013-12-11 17:41:10 +01:00
parent a3f1c428c2
commit 92c32dca9e
2 changed files with 4 additions and 8 deletions

View File

@ -290,10 +290,10 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
addObjectMappers(objectMappersAgg.mappers.toArray(new ObjectMapper[objectMappersAgg.mappers.size()])); addObjectMappers(objectMappersAgg.mappers.toArray(new ObjectMapper[objectMappersAgg.mappers.size()]));
mapper.addObjectMapperListener(objectMapperListener, false); mapper.addObjectMapperListener(objectMapperListener, false);
mappers = newMapBuilder(mappers).put(mapper.type(), mapper).map();
for (DocumentTypeListener typeListener : typeListeners) { for (DocumentTypeListener typeListener : typeListeners) {
typeListener.created(mapper.type()); typeListener.created(mapper.type());
} }
mappers = newMapBuilder(mappers).put(mapper.type(), mapper).map();
return mapper; return mapper;
} }
} }

View File

@ -60,7 +60,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener(); private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener();
private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener(); private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener();
private volatile boolean realTimePercolatorEnabled = false; private boolean realTimePercolatorEnabled = false;
@Inject @Inject
public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService, public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService,
@ -94,19 +94,15 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
percolateQueries.clear(); percolateQueries.clear();
} }
void enableRealTimePercolator() { synchronized void enableRealTimePercolator() {
logger.trace("trying to enable realtime percolation for index [{}] and shard[{}] on node[{}]", shardId.index(), shardId.id(), nodeName());
if (!realTimePercolatorEnabled) { if (!realTimePercolatorEnabled) {
logger.debug("enabling realtime percolating for index [{}] and shard[{}] on node[{}]", shardId.index(), shardId.id(), nodeName());
indexingService.addListener(realTimePercolatorOperationListener); indexingService.addListener(realTimePercolatorOperationListener);
realTimePercolatorEnabled = true; realTimePercolatorEnabled = true;
} }
} }
void disableRealTimePercolator() { synchronized void disableRealTimePercolator() {
logger.trace("trying to disable realtime percolation for index [{}] and shard[{}] on node[{}]", shardId.index(), shardId.id(), nodeName());
if (realTimePercolatorEnabled) { if (realTimePercolatorEnabled) {
logger.debug("disabling realtime percolating for index [{}] and shard[{}] on node[{}]", shardId.index(), shardId.id(), nodeName());
indexingService.removeListener(realTimePercolatorOperationListener); indexingService.removeListener(realTimePercolatorOperationListener);
realTimePercolatorEnabled = false; realTimePercolatorEnabled = false;
} }