mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 18:35:25 +00:00
move remaining CHM construction to the factory method
This commit is contained in:
parent
de3c74ab96
commit
95fc7a39a3
@ -31,6 +31,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.inject.internal.Nullable;
|
import org.elasticsearch.common.inject.internal.Nullable;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
|
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
|
||||||
import org.elasticsearch.node.service.NodeService;
|
import org.elasticsearch.node.service.NodeService;
|
||||||
@ -70,7 +71,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||||||
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>();
|
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>();
|
||||||
|
|
||||||
// use CHM here and not ConcurrentMaps#new since we want to be able to agentify this using TC later on...
|
// use CHM here and not ConcurrentMaps#new since we want to be able to agentify this using TC later on...
|
||||||
private static final ConcurrentMap<ClusterName, ClusterGroup> clusterGroups = new ConcurrentHashMap<ClusterName, ClusterGroup>();
|
private static final ConcurrentMap<ClusterName, ClusterGroup> clusterGroups = ConcurrentCollections.newConcurrentMap();
|
||||||
|
|
||||||
private static final AtomicLong nodeIdGenerator = new AtomicLong();
|
private static final AtomicLong nodeIdGenerator = new AtomicLong();
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
||||||
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
|
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
||||||
@ -149,7 +150,7 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
|
|||||||
|
|
||||||
private final AtomicInteger counter;
|
private final AtomicInteger counter;
|
||||||
|
|
||||||
private ConcurrentMap<DiscoveryNode, PingResponse> responses = new ConcurrentHashMap<DiscoveryNode, PingResponse>();
|
private ConcurrentMap<DiscoveryNode, PingResponse> responses = ConcurrentCollections.newConcurrentMap();
|
||||||
|
|
||||||
private CompoundPingListener(PingListener listener, ImmutableList<? extends ZenPing> zenPings) {
|
private CompoundPingListener(PingListener listener, ImmutableList<? extends ZenPing> zenPings) {
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.*;
|
|||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
@ -238,7 +239,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final int id = pingIdGenerator.incrementAndGet();
|
final int id = pingIdGenerator.incrementAndGet();
|
||||||
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
|
receivedResponses.put(id, ConcurrentCollections.<DiscoveryNode, PingResponse>newConcurrentMap());
|
||||||
sendPingRequest(id);
|
sendPingRequest(id);
|
||||||
// try and send another ping request halfway through (just in case someone woke up during it...)
|
// try and send another ping request halfway through (just in case someone woke up during it...)
|
||||||
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
|
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
|
||||||
|
@ -172,7 +172,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
|||||||
@Override
|
@Override
|
||||||
public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
|
public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
|
||||||
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet());
|
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet());
|
||||||
receivedResponses.put(sendPingsHandler.id(), new ConcurrentHashMap<DiscoveryNode, PingResponse>());
|
receivedResponses.put(sendPingsHandler.id(), ConcurrentCollections.<DiscoveryNode, PingResponse>newConcurrentMap());
|
||||||
sendPings(timeout, null, sendPingsHandler);
|
sendPings(timeout, null, sendPingsHandler);
|
||||||
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
|
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -23,6 +23,7 @@ import org.apache.lucene.index.IndexCommit;
|
|||||||
import org.apache.lucene.index.IndexDeletionPolicy;
|
import org.apache.lucene.index.IndexDeletionPolicy;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.inject.name.Named;
|
import org.elasticsearch.common.inject.name.Named;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||||
|
|
||||||
@ -44,7 +45,7 @@ public class SnapshotDeletionPolicy extends AbstractIndexShardComponent implemen
|
|||||||
|
|
||||||
private final IndexDeletionPolicy primary;
|
private final IndexDeletionPolicy primary;
|
||||||
|
|
||||||
private ConcurrentMap<Long, SnapshotHolder> snapshots = new ConcurrentHashMap<Long, SnapshotHolder>();
|
private ConcurrentMap<Long, SnapshotHolder> snapshots = ConcurrentCollections.newConcurrentMap();
|
||||||
|
|
||||||
private volatile List<SnapshotIndexCommit> commits;
|
private volatile List<SnapshotIndexCommit> commits;
|
||||||
|
|
||||||
|
@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.analysis.AnalysisService;
|
import org.elasticsearch.index.analysis.AnalysisService;
|
||||||
import org.elasticsearch.index.cache.bloom.BloomCache;
|
import org.elasticsearch.index.cache.bloom.BloomCache;
|
||||||
@ -193,7 +194,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||||||
this.bloomCache = bloomCache;
|
this.bloomCache = bloomCache;
|
||||||
|
|
||||||
this.indexConcurrency = indexSettings.getAsInt("index.index_concurrency", IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
|
this.indexConcurrency = indexSettings.getAsInt("index.index_concurrency", IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
|
||||||
this.versionMap = new ConcurrentHashMap<String, VersionValue>();
|
this.versionMap = ConcurrentCollections.newConcurrentMap();
|
||||||
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
|
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
|
||||||
for (int i = 0; i < dirtyLocks.length; i++) {
|
for (int i = 0; i < dirtyLocks.length; i++) {
|
||||||
dirtyLocks[i] = new Object();
|
dirtyLocks[i] = new Object();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user