mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-19 19:35:02 +00:00
* Just removing some obviously unused things
This commit is contained in:
parent
542e2c55f6
commit
28b771f5db
@ -213,7 +213,7 @@ public class PeerFinderTests extends ESTestCase {
|
|||||||
ConnectionManager innerConnectionManager
|
ConnectionManager innerConnectionManager
|
||||||
= new ConnectionManager(settings, capturingTransport);
|
= new ConnectionManager(settings, capturingTransport);
|
||||||
StubbableConnectionManager connectionManager
|
StubbableConnectionManager connectionManager
|
||||||
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport, deterministicTaskQueue.getThreadPool());
|
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport);
|
||||||
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
|
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
|
||||||
final boolean isConnected = connectedNodes.contains(discoveryNode);
|
final boolean isConnected = connectedNodes.contains(discoveryNode);
|
||||||
final boolean isDisconnected = disconnectedNodes.contains(discoveryNode);
|
final boolean isDisconnected = disconnectedNodes.contains(discoveryNode);
|
||||||
|
@ -55,7 +55,7 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
|
|||||||
protected SearchContext createSearchContext(IndexService indexService) {
|
protected SearchContext createSearchContext(IndexService indexService) {
|
||||||
BigArrays bigArrays = indexService.getBigArrays();
|
BigArrays bigArrays = indexService.getBigArrays();
|
||||||
ThreadPool threadPool = indexService.getThreadPool();
|
ThreadPool threadPool = indexService.getThreadPool();
|
||||||
return new TestSearchContext(threadPool, bigArrays, indexService) {
|
return new TestSearchContext(bigArrays, indexService) {
|
||||||
final ShardSearchRequest request = new ShardSearchRequest() {
|
final ShardSearchRequest request = new ShardSearchRequest() {
|
||||||
private SearchSourceBuilder searchSourceBuilder;
|
private SearchSourceBuilder searchSourceBuilder;
|
||||||
@Override
|
@Override
|
||||||
|
@ -508,8 +508,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||||||
};
|
};
|
||||||
|
|
||||||
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
|
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
|
||||||
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport,
|
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);
|
||||||
threadPool);
|
|
||||||
|
|
||||||
connectionManager.addConnectBehavior(seedNode.getAddress(), (cm, discoveryNode) -> {
|
connectionManager.addConnectBehavior(seedNode.getAddress(), (cm, discoveryNode) -> {
|
||||||
if (discoveryNode == seedNode) {
|
if (discoveryNode == seedNode) {
|
||||||
@ -1162,8 +1161,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||||||
};
|
};
|
||||||
|
|
||||||
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
|
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
|
||||||
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport,
|
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);
|
||||||
threadPool);
|
|
||||||
|
|
||||||
connectionManager.addNodeConnectedBehavior(connectedNode.getAddress(), (cm, discoveryNode)
|
connectionManager.addNodeConnectedBehavior(connectedNode.getAddress(), (cm, discoveryNode)
|
||||||
-> discoveryNode.equals(connectedNode));
|
-> discoveryNode.equals(connectedNode));
|
||||||
|
@ -793,7 +793,7 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
|
|||||||
return query;
|
return query;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected QueryBuilder rewriteAndFetch(QueryBuilder builder, QueryRewriteContext context) throws IOException {
|
protected QueryBuilder rewriteAndFetch(QueryBuilder builder, QueryRewriteContext context) {
|
||||||
PlainActionFuture<QueryBuilder> future = new PlainActionFuture<>();
|
PlainActionFuture<QueryBuilder> future = new PlainActionFuture<>();
|
||||||
Rewriteable.rewriteAndFetch(builder, context, future);
|
Rewriteable.rewriteAndFetch(builder, context, future);
|
||||||
return future.actionGet();
|
return future.actionGet();
|
||||||
|
@ -67,17 +67,6 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||||||
volatile int minFieldSize = 10;
|
volatile int minFieldSize = 10;
|
||||||
volatile int maxFieldSize = 140;
|
volatile int maxFieldSize = 140;
|
||||||
|
|
||||||
/**
|
|
||||||
* Start indexing in the background using a random number of threads.
|
|
||||||
*
|
|
||||||
* @param index index name to index into
|
|
||||||
* @param type document type
|
|
||||||
* @param client client to use
|
|
||||||
*/
|
|
||||||
public BackgroundIndexer(String index, String type, Client client) {
|
|
||||||
this(index, type, client, -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start indexing in the background using a random number of threads. Indexing will be paused after numOfDocs docs has
|
* Start indexing in the background using a random number of threads. Indexing will be paused after numOfDocs docs has
|
||||||
* been indexed.
|
* been indexed.
|
||||||
@ -239,11 +228,6 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Start indexing with no limit to the number of documents */
|
|
||||||
public void start() {
|
|
||||||
start(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start indexing
|
* Start indexing
|
||||||
*
|
*
|
||||||
@ -261,11 +245,6 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||||||
setBudget(0);
|
setBudget(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Continue indexing after it has paused. No new document limit will be set */
|
|
||||||
public void continueIndexing() {
|
|
||||||
continueIndexing(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Continue indexing after it has paused.
|
* Continue indexing after it has paused.
|
||||||
*
|
*
|
||||||
@ -299,16 +278,6 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||||||
Assert.assertThat(failures, emptyIterable());
|
Assert.assertThat(failures, emptyIterable());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** the minimum size in code points of a payload field in the indexed documents */
|
|
||||||
public void setMinFieldSize(int fieldSize) {
|
|
||||||
minFieldSize = fieldSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** the minimum size in code points of a payload field in the indexed documents */
|
|
||||||
public void setMaxFieldSize(int fieldSize) {
|
|
||||||
maxFieldSize = fieldSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setAssertNoFailuresOnStop(final boolean assertNoFailuresOnStop) {
|
public void setAssertNoFailuresOnStop(final boolean assertNoFailuresOnStop) {
|
||||||
this.assertNoFailuresOnStop = assertNoFailuresOnStop;
|
this.assertNoFailuresOnStop = assertNoFailuresOnStop;
|
||||||
}
|
}
|
||||||
|
@ -51,7 +51,6 @@ import org.elasticsearch.node.NodeValidationException;
|
|||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.search.internal.SearchContext;
|
import org.elasticsearch.search.internal.SearchContext;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
@ -312,8 +311,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||||||
*/
|
*/
|
||||||
protected SearchContext createSearchContext(IndexService indexService) {
|
protected SearchContext createSearchContext(IndexService indexService) {
|
||||||
BigArrays bigArrays = indexService.getBigArrays();
|
BigArrays bigArrays = indexService.getBigArrays();
|
||||||
ThreadPool threadPool = indexService.getThreadPool();
|
return new TestSearchContext(bigArrays, indexService);
|
||||||
return new TestSearchContext(threadPool, bigArrays, indexService);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
|
||||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
@ -57,7 +56,6 @@ import org.elasticsearch.search.query.QuerySearchResult;
|
|||||||
import org.elasticsearch.search.rescore.RescoreContext;
|
import org.elasticsearch.search.rescore.RescoreContext;
|
||||||
import org.elasticsearch.search.sort.SortAndFormats;
|
import org.elasticsearch.search.sort.SortAndFormats;
|
||||||
import org.elasticsearch.search.suggest.SuggestionSearchContext;
|
import org.elasticsearch.search.suggest.SuggestionSearchContext;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -69,7 +67,6 @@ public class TestSearchContext extends SearchContext {
|
|||||||
final BigArrays bigArrays;
|
final BigArrays bigArrays;
|
||||||
final IndexService indexService;
|
final IndexService indexService;
|
||||||
final BitsetFilterCache fixedBitSetFilterCache;
|
final BitsetFilterCache fixedBitSetFilterCache;
|
||||||
final ThreadPool threadPool;
|
|
||||||
final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
|
final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
|
||||||
final IndexShard indexShard;
|
final IndexShard indexShard;
|
||||||
final QuerySearchResult queryResult = new QuerySearchResult();
|
final QuerySearchResult queryResult = new QuerySearchResult();
|
||||||
@ -92,11 +89,10 @@ public class TestSearchContext extends SearchContext {
|
|||||||
private final long originNanoTime = System.nanoTime();
|
private final long originNanoTime = System.nanoTime();
|
||||||
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
|
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
|
||||||
|
|
||||||
public TestSearchContext(ThreadPool threadPool, BigArrays bigArrays, IndexService indexService) {
|
public TestSearchContext(BigArrays bigArrays, IndexService indexService) {
|
||||||
this.bigArrays = bigArrays.withCircuitBreaking();
|
this.bigArrays = bigArrays.withCircuitBreaking();
|
||||||
this.indexService = indexService;
|
this.indexService = indexService;
|
||||||
this.fixedBitSetFilterCache = indexService.cache().bitsetFilterCache();
|
this.fixedBitSetFilterCache = indexService.cache().bitsetFilterCache();
|
||||||
this.threadPool = threadPool;
|
|
||||||
this.indexShard = indexService.getShardOrNull(0);
|
this.indexShard = indexService.getShardOrNull(0);
|
||||||
queryShardContext = indexService.newQueryShardContext(0, null, () -> 0L, null);
|
queryShardContext = indexService.newQueryShardContext(0, null, () -> 0L, null);
|
||||||
}
|
}
|
||||||
@ -108,7 +104,6 @@ public class TestSearchContext extends SearchContext {
|
|||||||
public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard) {
|
public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard) {
|
||||||
this.bigArrays = null;
|
this.bigArrays = null;
|
||||||
this.indexService = null;
|
this.indexService = null;
|
||||||
this.threadPool = null;
|
|
||||||
this.fixedBitSetFilterCache = null;
|
this.fixedBitSetFilterCache = null;
|
||||||
this.indexShard = indexShard;
|
this.indexShard = indexShard;
|
||||||
this.queryShardContext = queryShardContext;
|
this.queryShardContext = queryShardContext;
|
||||||
@ -267,10 +262,6 @@ public class TestSearchContext extends SearchContext {
|
|||||||
return searcher;
|
return searcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSearcher(Engine.Searcher searcher) {
|
|
||||||
this.searcher = new ContextIndexSearcher(searcher, indexService.cache().query(), indexShard.getQueryCachingPolicy());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexShard indexShard() {
|
public IndexShard indexShard() {
|
||||||
return indexShard;
|
return indexShard;
|
||||||
|
@ -1,110 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.test.discovery;
|
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
||||||
import org.elasticsearch.discovery.zen.PingContextProvider;
|
|
||||||
import org.elasticsearch.discovery.zen.ZenPing;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A {@link ZenPing} implementation which returns results based on a static in-memory map. This allows pinging
|
|
||||||
* to be immediate and can be used to speed up tests.
|
|
||||||
*/
|
|
||||||
public final class MockZenPing implements ZenPing {
|
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(MockZenPing.class);
|
|
||||||
|
|
||||||
static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = new HashMap<>();
|
|
||||||
|
|
||||||
/** a set of the last discovered pings. used to throttle busy spinning where MockZenPing will keep returning the same results */
|
|
||||||
private Set<MockZenPing> lastDiscoveredPings = null;
|
|
||||||
|
|
||||||
private final PingContextProvider contextProvider;
|
|
||||||
|
|
||||||
public MockZenPing(PingContextProvider contextProvider) {
|
|
||||||
this.contextProvider = contextProvider;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
synchronized (activeNodesPerCluster) {
|
|
||||||
boolean added = getActiveNodesForCurrentCluster().add(this);
|
|
||||||
assert added;
|
|
||||||
activeNodesPerCluster.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout) {
|
|
||||||
logger.info("pinging using mock zen ping");
|
|
||||||
synchronized (activeNodesPerCluster) {
|
|
||||||
Set<MockZenPing> activeNodes = getActiveNodesForCurrentCluster();
|
|
||||||
if (activeNodes.equals(lastDiscoveredPings)) {
|
|
||||||
try {
|
|
||||||
logger.trace("nothing has changed since the last ping. waiting for a change");
|
|
||||||
activeNodesPerCluster.wait(timeout.millis());
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
|
|
||||||
}
|
|
||||||
activeNodes = getActiveNodesForCurrentCluster();
|
|
||||||
}
|
|
||||||
lastDiscoveredPings = activeNodes;
|
|
||||||
PingCollection pingCollection = new PingCollection();
|
|
||||||
activeNodes.stream()
|
|
||||||
.filter(p -> p != this) // remove this as pings are not expected to return the local node
|
|
||||||
.map(MockZenPing::getPingResponse)
|
|
||||||
.forEach(pingCollection::addPing);
|
|
||||||
resultsConsumer.accept(pingCollection);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ClusterName getClusterName() {
|
|
||||||
return contextProvider.clusterState().getClusterName();
|
|
||||||
}
|
|
||||||
|
|
||||||
private PingResponse getPingResponse() {
|
|
||||||
final ClusterState clusterState = contextProvider.clusterState();
|
|
||||||
return new PingResponse(clusterState.nodes().getLocalNode(), clusterState.nodes().getMasterNode(), clusterState);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Set<MockZenPing> getActiveNodesForCurrentCluster() {
|
|
||||||
assert Thread.holdsLock(activeNodesPerCluster);
|
|
||||||
return activeNodesPerCluster.computeIfAbsent(getClusterName(),
|
|
||||||
clusterName -> ConcurrentCollections.newConcurrentSet());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
synchronized (activeNodesPerCluster) {
|
|
||||||
boolean found = getActiveNodesForCurrentCluster().remove(this);
|
|
||||||
assert found;
|
|
||||||
activeNodesPerCluster.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -30,12 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
|
|
||||||
public class BlockClusterStateProcessing extends SingleNodeDisruption {
|
public class BlockClusterStateProcessing extends SingleNodeDisruption {
|
||||||
|
|
||||||
AtomicReference<CountDownLatch> disruptionLatch = new AtomicReference<>();
|
private final AtomicReference<CountDownLatch> disruptionLatch = new AtomicReference<>();
|
||||||
|
|
||||||
|
|
||||||
public BlockClusterStateProcessing(Random random) {
|
|
||||||
this(null, random);
|
|
||||||
}
|
|
||||||
|
|
||||||
public BlockClusterStateProcessing(String disruptedNode, Random random) {
|
public BlockClusterStateProcessing(String disruptedNode, Random random) {
|
||||||
super(random);
|
super(random);
|
||||||
|
@ -280,10 +280,6 @@ public class ElasticsearchAssertions {
|
|||||||
assertSearchHit(searchResponse, 4, matcher);
|
assertSearchHit(searchResponse, 4, matcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void assertFifthHit(SearchResponse searchResponse, Matcher<SearchHit> matcher) {
|
|
||||||
assertSearchHit(searchResponse, 5, matcher);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void assertSearchHit(SearchResponse searchResponse, int number, Matcher<SearchHit> matcher) {
|
public static void assertSearchHit(SearchResponse searchResponse, int number, Matcher<SearchHit> matcher) {
|
||||||
assertThat(number, greaterThan(0));
|
assertThat(number, greaterThan(0));
|
||||||
assertThat("SearchHit number must be greater than 0", number, greaterThan(0));
|
assertThat("SearchHit number must be greater than 0", number, greaterThan(0));
|
||||||
@ -657,14 +653,6 @@ public class ElasticsearchAssertions {
|
|||||||
assertThat("file/dir [" + file + "] should not exist.", Files.exists(file), is(false));
|
assertThat("file/dir [" + file + "] should not exist.", Files.exists(file), is(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if a directory exists
|
|
||||||
*/
|
|
||||||
public static void assertDirectoryExists(Path dir) {
|
|
||||||
assertFileExists(dir);
|
|
||||||
assertThat("file [" + dir + "] should be a directory.", Files.isDirectory(dir), is(true));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asserts that the provided {@link BytesReference}s created through
|
* Asserts that the provided {@link BytesReference}s created through
|
||||||
* {@link org.elasticsearch.common.xcontent.ToXContent#toXContent(XContentBuilder, ToXContent.Params)} hold the same content.
|
* {@link org.elasticsearch.common.xcontent.ToXContent#toXContent(XContentBuilder, ToXContent.Params)} hold the same content.
|
||||||
|
@ -181,7 +181,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final class CloseableDirectory implements Closeable {
|
static final class CloseableDirectory implements Closeable {
|
||||||
private final BaseDirectoryWrapper dir;
|
private final BaseDirectoryWrapper dir;
|
||||||
private final TestRuleMarkFailure failureMarker;
|
private final TestRuleMarkFailure failureMarker;
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ public class MockTransport implements Transport, LifecycleComponent {
|
|||||||
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
|
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
|
||||||
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
|
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
|
||||||
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this),
|
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this),
|
||||||
settings, this, threadPool);
|
settings, this);
|
||||||
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
|
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
|
||||||
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
|
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
|
||||||
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
|
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
|
||||||
|
@ -157,7 +157,7 @@ public final class MockTransportService extends TransportService {
|
|||||||
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
|
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
|
||||||
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
|
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
|
||||||
super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
|
super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
|
||||||
new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport, threadPool));
|
new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport));
|
||||||
this.original = transport.getDelegate();
|
this.original = transport.getDelegate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,7 +22,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||||||
import org.elasticsearch.common.CheckedBiConsumer;
|
import org.elasticsearch.common.CheckedBiConsumer;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
import org.elasticsearch.transport.ConnectTransportException;
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
import org.elasticsearch.transport.ConnectionManager;
|
import org.elasticsearch.transport.ConnectionManager;
|
||||||
import org.elasticsearch.transport.ConnectionProfile;
|
import org.elasticsearch.transport.ConnectionProfile;
|
||||||
@ -41,7 +40,7 @@ public class StubbableConnectionManager extends ConnectionManager {
|
|||||||
private volatile GetConnectionBehavior defaultGetConnectionBehavior = ConnectionManager::getConnection;
|
private volatile GetConnectionBehavior defaultGetConnectionBehavior = ConnectionManager::getConnection;
|
||||||
private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
|
private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
|
||||||
|
|
||||||
public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport, ThreadPool threadPool) {
|
public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport) {
|
||||||
super(settings, transport);
|
super(settings, transport);
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
this.getConnectionBehaviors = new ConcurrentHashMap<>();
|
this.getConnectionBehaviors = new ConcurrentHashMap<>();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user