Merge branch 'master' into ccr
* master: Remove unnecessary exception for engine constructor Update docs about `script` parameter (#27010) Don't refresh on `_flush` `_force_merge` and `_upgrade` (#27000) Do not set SO_LINGER on server channels (#26997) Fix inconsistencies in the rest api specs for *_script (#26971) fix inconsistencies in the rest api specs for cat.snapshots (#26996) Add docs on full_id parameter in cat nodes API [TEST] Add test that replicates versioned updates with random flushes Use internal searcher for all indexing related operations in the engine Reformat paragraph in template docs to 80 columns Clarify settings and template on create index Fix reference to TcpTransport in documentation Allow Uid#decodeId to decode from a byte array slice (#26987) Fix a typo in the similarity docs (#26970) Use separate searchers for "search visibility" vs "move indexing buffer to disk (#26972)
This commit is contained in:
commit
8a3540c366
|
@ -305,9 +305,9 @@ public class TermVectorsResponse extends ActionResponse implements ToXContentObj
|
|||
long sumDocFreq = curTerms.getSumDocFreq();
|
||||
int docCount = curTerms.getDocCount();
|
||||
long sumTotalTermFrequencies = curTerms.getSumTotalTermFreq();
|
||||
if (docCount > 0) {
|
||||
assert ((sumDocFreq > 0)) : "docCount >= 0 but sumDocFreq ain't!";
|
||||
assert ((sumTotalTermFrequencies > 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!";
|
||||
if (docCount >= 0) {
|
||||
assert ((sumDocFreq >= 0)) : "docCount >= 0 but sumDocFreq ain't!";
|
||||
assert ((sumTotalTermFrequencies >= 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!";
|
||||
builder.startObject(FieldStrings.FIELD_STATISTICS);
|
||||
builder.field(FieldStrings.SUM_DOC_FREQ, sumDocFreq);
|
||||
builder.field(FieldStrings.DOC_COUNT, docCount);
|
||||
|
|
|
@ -90,7 +90,7 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public abstract class Engine implements Closeable {
|
||||
|
||||
|
@ -465,8 +465,9 @@ public abstract class Engine implements Closeable {
|
|||
PENDING_OPERATIONS
|
||||
}
|
||||
|
||||
protected final GetResult getFromSearcher(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
|
||||
final Searcher searcher = searcherFactory.apply("get");
|
||||
protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory,
|
||||
SearcherScope scope) throws EngineException {
|
||||
final Searcher searcher = searcherFactory.apply("get", scope);
|
||||
final DocIdAndVersion docIdAndVersion;
|
||||
try {
|
||||
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
|
@ -494,23 +495,40 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException;
|
||||
public abstract GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException;
|
||||
|
||||
|
||||
/**
|
||||
* Returns a new searcher instance. The consumer of this
|
||||
* API is responsible for releasing the returned searcher in a
|
||||
* safe manner, preferably in a try/finally block.
|
||||
*
|
||||
* @param source the source API or routing that triggers this searcher acquire
|
||||
*
|
||||
* @see Searcher#close()
|
||||
*/
|
||||
public final Searcher acquireSearcher(String source) throws EngineException {
|
||||
return acquireSearcher(source, SearcherScope.EXTERNAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new searcher instance. The consumer of this
|
||||
* API is responsible for releasing the returned searcher in a
|
||||
* safe manner, preferably in a try/finally block.
|
||||
*
|
||||
* @param source the source API or routing that triggers this searcher acquire
|
||||
* @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes
|
||||
*
|
||||
* @see Searcher#close()
|
||||
*/
|
||||
public final Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
|
||||
boolean success = false;
|
||||
/* Acquire order here is store -> manager since we need
|
||||
* to make sure that the store is not closed before
|
||||
* the searcher is acquired. */
|
||||
store.incRef();
|
||||
try {
|
||||
final SearcherManager manager = getSearcherManager(); // can never be null
|
||||
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
|
||||
/* This might throw NPE but that's fine we will run ensureOpen()
|
||||
* in the catch block and throw the right exception */
|
||||
final IndexSearcher searcher = manager.acquire();
|
||||
|
@ -536,6 +554,10 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public enum SearcherScope {
|
||||
EXTERNAL, INTERNAL
|
||||
}
|
||||
|
||||
/** returns the translog for this engine */
|
||||
public abstract Translog getTranslog();
|
||||
|
||||
|
@ -768,7 +790,7 @@ public abstract class Engine implements Closeable {
|
|||
the store is closed so we need to make sure we increment it here
|
||||
*/
|
||||
try {
|
||||
return getSearcherManager().isSearcherCurrent() == false;
|
||||
return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to access searcher manager", e);
|
||||
failEngine("failed to access searcher manager", e);
|
||||
|
@ -1306,7 +1328,7 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract SearcherManager getSearcherManager();
|
||||
protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
|
||||
|
||||
/**
|
||||
* Method to close the engine while the write lock is held.
|
||||
|
|
|
@ -93,7 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class InternalEngine extends Engine {
|
||||
|
@ -108,20 +108,18 @@ public class InternalEngine extends Engine {
|
|||
|
||||
private final IndexWriter indexWriter;
|
||||
|
||||
private final SearcherFactory searcherFactory;
|
||||
private final SearcherManager searcherManager;
|
||||
private final SearcherManager externalSearcherManager;
|
||||
private final SearcherManager internalSearcherManager;
|
||||
|
||||
private final Lock flushLock = new ReentrantLock();
|
||||
private final ReentrantLock optimizeLock = new ReentrantLock();
|
||||
|
||||
// A uid (in the form of BytesRef) to the version map
|
||||
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
|
||||
private final LiveVersionMap versionMap;
|
||||
private final LiveVersionMap versionMap = new LiveVersionMap();
|
||||
|
||||
private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
|
||||
|
||||
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
|
||||
|
||||
private volatile SegmentInfos lastCommittedSegmentInfos;
|
||||
|
||||
private final IndexThrottle throttle;
|
||||
|
@ -146,14 +144,13 @@ public class InternalEngine extends Engine {
|
|||
@Nullable
|
||||
private final String historyUUID;
|
||||
|
||||
public InternalEngine(EngineConfig engineConfig) throws EngineException {
|
||||
public InternalEngine(EngineConfig engineConfig) {
|
||||
super(engineConfig);
|
||||
openMode = engineConfig.getOpenMode();
|
||||
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
|
||||
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
|
||||
}
|
||||
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
|
||||
this.versionMap = new LiveVersionMap();
|
||||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
|
||||
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
|
||||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
|
||||
|
@ -163,7 +160,8 @@ public class InternalEngine extends Engine {
|
|||
store.incRef();
|
||||
IndexWriter writer = null;
|
||||
Translog translog = null;
|
||||
SearcherManager manager = null;
|
||||
SearcherManager externalSearcherManager = null;
|
||||
SearcherManager internalSearcherManager = null;
|
||||
EngineMergeScheduler scheduler = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -171,7 +169,6 @@ public class InternalEngine extends Engine {
|
|||
|
||||
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
|
||||
throttle = new IndexThrottle();
|
||||
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
|
||||
try {
|
||||
final SeqNoStats seqNoStats;
|
||||
switch (openMode) {
|
||||
|
@ -215,20 +212,21 @@ public class InternalEngine extends Engine {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
manager = createSearcherManager();
|
||||
this.searcherManager = manager;
|
||||
this.versionMap.setManager(searcherManager);
|
||||
internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
|
||||
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
|
||||
this.internalSearcherManager = internalSearcherManager;
|
||||
this.externalSearcherManager = externalSearcherManager;
|
||||
internalSearcherManager.addListener(versionMap);
|
||||
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
|
||||
// don't allow commits until we are done with recovering
|
||||
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
|
||||
for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) {
|
||||
searcherManager.addListener(listener);
|
||||
this.externalSearcherManager.addListener(listener);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler);
|
||||
versionMap.clear();
|
||||
IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
|
||||
if (isClosed.get() == false) {
|
||||
// failure we need to dec the store reference
|
||||
store.decRef();
|
||||
|
@ -345,6 +343,7 @@ public class InternalEngine extends Engine {
|
|||
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
|
||||
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
|
||||
flush(true, true);
|
||||
refresh("translog_recovery");
|
||||
} else if (translog.isCurrent(translogGeneration) == false) {
|
||||
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
|
||||
refreshLastCommittedSegmentInfos();
|
||||
|
@ -441,14 +440,16 @@ public class InternalEngine extends Engine {
|
|||
return uuid;
|
||||
}
|
||||
|
||||
private SearcherManager createSearcherManager() throws EngineException {
|
||||
private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
|
||||
boolean success = false;
|
||||
SearcherManager searcherManager = null;
|
||||
try {
|
||||
try {
|
||||
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
|
||||
searcherManager = new SearcherManager(directoryReader, searcherFactory);
|
||||
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
|
||||
if (readSegmentsInfo) {
|
||||
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
|
||||
}
|
||||
success = true;
|
||||
return searcherManager;
|
||||
} catch (IOException e) {
|
||||
|
@ -468,10 +469,11 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
|
||||
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
|
||||
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
SearcherScope scope;
|
||||
if (get.realtime()) {
|
||||
VersionValue versionValue = versionMap.getUnderLock(get.uid());
|
||||
if (versionValue != null) {
|
||||
|
@ -482,12 +484,16 @@ public class InternalEngine extends Engine {
|
|||
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
|
||||
get.versionType().explainConflictForReads(versionValue.version, get.version()));
|
||||
}
|
||||
refresh("realtime_get");
|
||||
refresh("realtime_get", SearcherScope.INTERNAL);
|
||||
}
|
||||
scope = SearcherScope.INTERNAL;
|
||||
} else {
|
||||
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
|
||||
scope = SearcherScope.EXTERNAL;
|
||||
}
|
||||
|
||||
// no version, get the version from the index, we know that we refresh on flush
|
||||
return getFromSearcher(get, searcherFactory);
|
||||
return getFromSearcher(get, searcherFactory, scope);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -519,7 +525,7 @@ public class InternalEngine extends Engine {
|
|||
} else {
|
||||
// load from index
|
||||
assert incrementIndexVersionLookup();
|
||||
try (Searcher searcher = acquireSearcher("load_seq_no")) {
|
||||
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
|
||||
DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
|
||||
if (docAndSeqNo == null) {
|
||||
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
|
||||
|
@ -946,7 +952,7 @@ public class InternalEngine extends Engine {
|
|||
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
|
||||
}
|
||||
} else {
|
||||
try (Searcher searcher = acquireSearcher("assert doc doesn't exist")) {
|
||||
try (Searcher searcher = acquireSearcher("assert doc doesn't exist", SearcherScope.INTERNAL)) {
|
||||
final long docsWithId = searcher.searcher().count(new TermQuery(index.uid()));
|
||||
if (docsWithId > 0) {
|
||||
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists [" + docsWithId + "] times in index");
|
||||
|
@ -1187,17 +1193,34 @@ public class InternalEngine extends Engine {
|
|||
|
||||
@Override
|
||||
public void refresh(String source) throws EngineException {
|
||||
refresh(source, SearcherScope.EXTERNAL);
|
||||
}
|
||||
|
||||
final void refresh(String source, SearcherScope scope) throws EngineException {
|
||||
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
|
||||
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
searcherManager.maybeRefreshBlocking();
|
||||
switch (scope) {
|
||||
case EXTERNAL:
|
||||
// even though we maintain 2 managers we really do the heavy-lifting only once.
|
||||
// the second refresh will only do the extra work we have to do for warming caches etc.
|
||||
externalSearcherManager.maybeRefreshBlocking();
|
||||
// the break here is intentional we never refresh both internal / external together
|
||||
break;
|
||||
case INTERNAL:
|
||||
internalSearcherManager.maybeRefreshBlocking();
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalArgumentException("unknown scope: " + scope);
|
||||
}
|
||||
} catch (AlreadyClosedException e) {
|
||||
failOnTragicEvent(e);
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
failEngine("refresh failed", e);
|
||||
failEngine("refresh failed source[" + source + "]", e);
|
||||
} catch (Exception inner) {
|
||||
e.addSuppressed(inner);
|
||||
}
|
||||
|
@ -1208,36 +1231,20 @@ public class InternalEngine extends Engine {
|
|||
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
|
||||
// for a long time:
|
||||
maybePruneDeletedTombstones();
|
||||
versionMapRefreshPending.set(false);
|
||||
mergeScheduler.refreshConfig();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeIndexingBuffer() throws EngineException {
|
||||
|
||||
// we obtain a read lock here, since we don't want a flush to happen while we are writing
|
||||
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
|
||||
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
|
||||
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
|
||||
// refresh API), and another for version map interactions. See #15768.
|
||||
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
|
||||
final long indexingBufferBytes = indexWriter.ramBytesUsed();
|
||||
|
||||
final boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes / 4 < versionMapBytes);
|
||||
if (useRefresh) {
|
||||
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
|
||||
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
|
||||
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
|
||||
refresh("write indexing buffer");
|
||||
} else {
|
||||
// Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush:
|
||||
logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])",
|
||||
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
|
||||
indexWriter.flush();
|
||||
}
|
||||
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
|
||||
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
|
||||
refresh("write indexing buffer", SearcherScope.INTERNAL);
|
||||
} catch (AlreadyClosedException e) {
|
||||
failOnTragicEvent(e);
|
||||
throw e;
|
||||
|
@ -1302,10 +1309,11 @@ public class InternalEngine extends Engine {
|
|||
maybeFailEngine("renew sync commit", ex);
|
||||
throw new EngineException(shardId, "failed to renew sync commit", ex);
|
||||
}
|
||||
if (renewed) { // refresh outside of the write lock
|
||||
refresh("renew sync commit");
|
||||
if (renewed) {
|
||||
// refresh outside of the write lock
|
||||
// we have to refresh internal searcher here to ensure we release unreferenced segments.
|
||||
refresh("renew sync commit", SearcherScope.INTERNAL);
|
||||
}
|
||||
|
||||
return renewed;
|
||||
}
|
||||
|
||||
|
@ -1347,7 +1355,7 @@ public class InternalEngine extends Engine {
|
|||
commitIndexWriter(indexWriter, translog, null);
|
||||
logger.trace("finished commit for flush");
|
||||
// we need to refresh in order to clear older version values
|
||||
refresh("version_table_flush");
|
||||
refresh("version_table_flush", SearcherScope.INTERNAL);
|
||||
translog.trimUnreferencedReaders();
|
||||
} catch (Exception e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
|
@ -1651,8 +1659,11 @@ public class InternalEngine extends Engine {
|
|||
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
|
||||
try {
|
||||
this.versionMap.clear();
|
||||
if (internalSearcherManager != null) {
|
||||
internalSearcherManager.removeListener(versionMap);
|
||||
}
|
||||
try {
|
||||
IOUtils.close(searcherManager);
|
||||
IOUtils.close(externalSearcherManager, internalSearcherManager);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to close SearcherManager", e);
|
||||
}
|
||||
|
@ -1684,8 +1695,15 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected SearcherManager getSearcherManager() {
|
||||
return searcherManager;
|
||||
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
|
||||
switch (scope) {
|
||||
case INTERNAL:
|
||||
return internalSearcherManager;
|
||||
case EXTERNAL:
|
||||
return externalSearcherManager;
|
||||
default:
|
||||
throw new IllegalStateException("unknown scope: " + scope);
|
||||
}
|
||||
}
|
||||
|
||||
private Releasable acquireLock(BytesRef uid) {
|
||||
|
@ -1698,7 +1716,7 @@ public class InternalEngine extends Engine {
|
|||
|
||||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||
assert incrementIndexVersionLookup();
|
||||
try (Searcher searcher = acquireSearcher("load_version")) {
|
||||
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
|
||||
return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,8 +59,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
|
||||
private volatile Maps maps = new Maps();
|
||||
|
||||
private ReferenceManager<?> mgr;
|
||||
|
||||
/** Bytes consumed for each BytesRef UID:
|
||||
* In this base value, we account for the {@link BytesRef} object itself as
|
||||
* well as the header of the byte[] array it holds, and some lost bytes due
|
||||
|
@ -98,21 +96,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
/** Tracks bytes used by tombstones (deletes) */
|
||||
final AtomicLong ramBytesUsedTombstones = new AtomicLong();
|
||||
|
||||
/** Sync'd because we replace old mgr. */
|
||||
synchronized void setManager(ReferenceManager<?> newMgr) {
|
||||
if (mgr != null) {
|
||||
mgr.removeListener(this);
|
||||
}
|
||||
mgr = newMgr;
|
||||
|
||||
// In case InternalEngine closes & opens a new IndexWriter/SearcherManager, all deletes are made visible, so we clear old and
|
||||
// current here. This is safe because caller holds writeLock here (so no concurrent adds/deletes can be happeninge):
|
||||
maps = new Maps();
|
||||
|
||||
// So we are notified when reopen starts and finishes
|
||||
mgr.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeRefresh() throws IOException {
|
||||
// Start sending all updates after this point to the new
|
||||
|
@ -249,11 +232,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
// and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index
|
||||
// is being closed:
|
||||
//ramBytesUsedTombstones.set(0);
|
||||
|
||||
if (mgr != null) {
|
||||
mgr.removeListener(this);
|
||||
mgr = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -135,36 +135,36 @@ public final class Uid {
|
|||
// 'xxx=' and 'xxx' could be considered the same id
|
||||
final int length = id.length();
|
||||
switch (length & 0x03) {
|
||||
case 0:
|
||||
break;
|
||||
case 1:
|
||||
return false;
|
||||
case 2:
|
||||
// the last 2 symbols (12 bits) are encoding 1 byte (8 bits)
|
||||
// so the last symbol only actually uses 8-6=2 bits and can only take 4 values
|
||||
char last = id.charAt(length - 1);
|
||||
if (last != 'A' && last != 'Q' && last != 'g' && last != 'w') {
|
||||
case 0:
|
||||
break;
|
||||
case 1:
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
// The last 3 symbols (18 bits) are encoding 2 bytes (16 bits)
|
||||
// so the last symbol only actually uses 16-12=4 bits and can only take 16 values
|
||||
last = id.charAt(length - 1);
|
||||
if (last != 'A' && last != 'E' && last != 'I' && last != 'M' && last != 'Q'&& last != 'U'&& last != 'Y'
|
||||
case 2:
|
||||
// the last 2 symbols (12 bits) are encoding 1 byte (8 bits)
|
||||
// so the last symbol only actually uses 8-6=2 bits and can only take 4 values
|
||||
char last = id.charAt(length - 1);
|
||||
if (last != 'A' && last != 'Q' && last != 'g' && last != 'w') {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
// The last 3 symbols (18 bits) are encoding 2 bytes (16 bits)
|
||||
// so the last symbol only actually uses 16-12=4 bits and can only take 16 values
|
||||
last = id.charAt(length - 1);
|
||||
if (last != 'A' && last != 'E' && last != 'I' && last != 'M' && last != 'Q'&& last != 'U'&& last != 'Y'
|
||||
&& last != 'c'&& last != 'g'&& last != 'k' && last != 'o' && last != 's' && last != 'w'
|
||||
&& last != '0' && last != '4' && last != '8') {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// number & 0x03 is always in [0,3]
|
||||
throw new AssertionError("Impossible case");
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// number & 0x03 is always in [0,3]
|
||||
throw new AssertionError("Impossible case");
|
||||
}
|
||||
for (int i = 0; i < length; ++i) {
|
||||
final char c = id.charAt(i);
|
||||
final boolean allowed =
|
||||
(c >= '0' && c <= '9') ||
|
||||
(c >= '0' && c <= '9') ||
|
||||
(c >= 'A' && c <= 'Z') ||
|
||||
(c >= 'a' && c <= 'z') ||
|
||||
c == '-' || c == '_';
|
||||
|
@ -244,16 +244,16 @@ public final class Uid {
|
|||
}
|
||||
}
|
||||
|
||||
private static String decodeNumericId(byte[] idBytes) {
|
||||
assert Byte.toUnsignedInt(idBytes[0]) == NUMERIC;
|
||||
int length = (idBytes.length - 1) * 2;
|
||||
private static String decodeNumericId(byte[] idBytes, int offset, int len) {
|
||||
assert Byte.toUnsignedInt(idBytes[offset]) == NUMERIC;
|
||||
int length = (len - 1) * 2;
|
||||
char[] chars = new char[length];
|
||||
for (int i = 1; i < idBytes.length; ++i) {
|
||||
final int b = Byte.toUnsignedInt(idBytes[i]);
|
||||
for (int i = 1; i < len; ++i) {
|
||||
final int b = Byte.toUnsignedInt(idBytes[offset + i]);
|
||||
final int b1 = (b >>> 4);
|
||||
final int b2 = b & 0x0f;
|
||||
chars[(i - 1) * 2] = (char) (b1 + '0');
|
||||
if (i == idBytes.length - 1 && b2 == 0x0f) {
|
||||
if (i == len - 1 && b2 == 0x0f) {
|
||||
length--;
|
||||
break;
|
||||
}
|
||||
|
@ -262,15 +262,17 @@ public final class Uid {
|
|||
return new String(chars, 0, length);
|
||||
}
|
||||
|
||||
private static String decodeUtf8Id(byte[] idBytes) {
|
||||
assert Byte.toUnsignedInt(idBytes[0]) == UTF8;
|
||||
return new BytesRef(idBytes, 1, idBytes.length - 1).utf8ToString();
|
||||
private static String decodeUtf8Id(byte[] idBytes, int offset, int length) {
|
||||
assert Byte.toUnsignedInt(idBytes[offset]) == UTF8;
|
||||
return new BytesRef(idBytes, offset + 1, length - 1).utf8ToString();
|
||||
}
|
||||
|
||||
private static String decodeBase64Id(byte[] idBytes) {
|
||||
assert Byte.toUnsignedInt(idBytes[0]) <= BASE64_ESCAPE;
|
||||
if (Byte.toUnsignedInt(idBytes[0]) == BASE64_ESCAPE) {
|
||||
idBytes = Arrays.copyOfRange(idBytes, 1, idBytes.length);
|
||||
private static String decodeBase64Id(byte[] idBytes, int offset, int length) {
|
||||
assert Byte.toUnsignedInt(idBytes[offset]) <= BASE64_ESCAPE;
|
||||
if (Byte.toUnsignedInt(idBytes[offset]) == BASE64_ESCAPE) {
|
||||
idBytes = Arrays.copyOfRange(idBytes, offset + 1, offset + length);
|
||||
} else if ((idBytes.length == length && offset == 0) == false) { // no need to copy if it's not a slice
|
||||
idBytes = Arrays.copyOfRange(idBytes, offset, offset + length);
|
||||
}
|
||||
return Base64.getUrlEncoder().withoutPadding().encodeToString(idBytes);
|
||||
}
|
||||
|
@ -278,17 +280,23 @@ public final class Uid {
|
|||
/** Decode an indexed id back to its original form.
|
||||
* @see #encodeId */
|
||||
public static String decodeId(byte[] idBytes) {
|
||||
if (idBytes.length == 0) {
|
||||
return decodeId(idBytes, 0, idBytes.length);
|
||||
}
|
||||
|
||||
/** Decode an indexed id back to its original form.
|
||||
* @see #encodeId */
|
||||
public static String decodeId(byte[] idBytes, int offset, int length) {
|
||||
if (length == 0) {
|
||||
throw new IllegalArgumentException("Ids can't be empty");
|
||||
}
|
||||
final int magicChar = Byte.toUnsignedInt(idBytes[0]);
|
||||
final int magicChar = Byte.toUnsignedInt(idBytes[offset]);
|
||||
switch (magicChar) {
|
||||
case NUMERIC:
|
||||
return decodeNumericId(idBytes);
|
||||
case UTF8:
|
||||
return decodeUtf8Id(idBytes);
|
||||
default:
|
||||
return decodeBase64Id(idBytes);
|
||||
case NUMERIC:
|
||||
return decodeNumericId(idBytes, offset, length);
|
||||
case UTF8:
|
||||
return decodeUtf8Id(idBytes, offset, length);
|
||||
default:
|
||||
return decodeBase64Id(idBytes, offset, length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1031,7 +1031,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("force merge with {}", forceMerge);
|
||||
}
|
||||
getEngine().forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
|
||||
Engine engine = getEngine();
|
||||
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
|
||||
forceMerge.onlyExpungeDeletes(), false, false);
|
||||
}
|
||||
|
||||
|
@ -1045,7 +1046,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion();
|
||||
// we just want to upgrade the segments, not actually forge merge to a single segment
|
||||
getEngine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
|
||||
final Engine engine = getEngine();
|
||||
engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
|
||||
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
|
||||
false, true, upgrade.upgradeOnlyAncientSegments());
|
||||
org.apache.lucene.util.Version version = minimumCompatibleVersion();
|
||||
|
@ -1126,11 +1128,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
// fail the engine. This will cause this shard to also be removed from the node's index service.
|
||||
getEngine().failEngine(reason, e);
|
||||
}
|
||||
|
||||
public Engine.Searcher acquireSearcher(String source) {
|
||||
return acquireSearcher(source, Engine.SearcherScope.EXTERNAL);
|
||||
}
|
||||
|
||||
private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
|
||||
readAllowed();
|
||||
final Engine engine = getEngine();
|
||||
final Engine.Searcher searcher = engine.acquireSearcher(source);
|
||||
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
|
||||
boolean success = false;
|
||||
try {
|
||||
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(searcher);
|
||||
|
|
|
@ -41,6 +41,7 @@ public class RestPutStoredScriptAction extends BaseRestHandler {
|
|||
|
||||
controller.registerHandler(POST, "/_scripts/{id}", this);
|
||||
controller.registerHandler(PUT, "/_scripts/{id}", this);
|
||||
controller.registerHandler(POST, "/_scripts/{id}/{context}", this);
|
||||
controller.registerHandler(PUT, "/_scripts/{id}/{context}", this);
|
||||
}
|
||||
|
||||
|
|
|
@ -905,11 +905,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
// first stop to accept any incoming connections so nobody can connect to this transport
|
||||
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
|
||||
try {
|
||||
closeChannels(entry.getValue(), true, true);
|
||||
closeChannels(entry.getValue(), true, false);
|
||||
} catch (Exception e) {
|
||||
logger.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"Error closing serverChannel for profile [{}]", entry.getKey()), e);
|
||||
logger.warn(new ParameterizedMessage("Error closing serverChannel for profile [{}]", entry.getKey()), e);
|
||||
}
|
||||
}
|
||||
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
|
||||
|
@ -1024,9 +1022,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
*
|
||||
* @param channels the channels to close
|
||||
* @param blocking whether the channels should be closed synchronously
|
||||
* @param closingTransport whether we abort the connection on RST instead of FIN
|
||||
* @param doNotLinger whether we abort the connection on RST instead of FIN
|
||||
*/
|
||||
protected abstract void closeChannels(List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException;
|
||||
protected abstract void closeChannels(List<Channel> channels, boolean blocking, boolean doNotLinger) throws IOException;
|
||||
|
||||
/**
|
||||
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception
|
||||
|
|
|
@ -55,6 +55,7 @@ public class IndicesSegmentsRequestTests extends ESSingleNodeTestCase {
|
|||
client().prepareIndex("test", "type1", id).setSource("text", "sometext").get();
|
||||
}
|
||||
client().admin().indices().prepareFlush("test").get();
|
||||
client().admin().indices().prepareRefresh().get();
|
||||
}
|
||||
|
||||
public void testBasic() {
|
||||
|
|
|
@ -131,16 +131,17 @@ public class GetActionIT extends ESIntegTestCase {
|
|||
assertThat(response.getField("field1").getValues().get(0).toString(), equalTo("value1"));
|
||||
assertThat(response.getField("field2"), nullValue());
|
||||
|
||||
logger.info("--> flush the index, so we load it from it");
|
||||
flush();
|
||||
|
||||
logger.info("--> realtime get 1 (loaded from index)");
|
||||
logger.info("--> realtime get 1");
|
||||
response = client().prepareGet(indexOrAlias(), "type1", "1").get();
|
||||
assertThat(response.isExists(), equalTo(true));
|
||||
assertThat(response.getIndex(), equalTo("test"));
|
||||
assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1"));
|
||||
assertThat(response.getSourceAsMap().get("field2").toString(), equalTo("value2"));
|
||||
|
||||
logger.info("--> refresh the index, so we load it from it");
|
||||
refresh();
|
||||
|
||||
logger.info("--> non realtime get 1 (loaded from index)");
|
||||
response = client().prepareGet(indexOrAlias(), "type1", "1").setRealtime(false).get();
|
||||
assertThat(response.isExists(), equalTo(true));
|
||||
|
|
|
@ -86,6 +86,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
|
||||
|
@ -942,7 +943,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
engine.index(indexForDoc(doc));
|
||||
|
||||
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
|
||||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory));
|
||||
final AtomicBoolean flushFinished = new AtomicBoolean(false);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
@ -977,7 +978,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
searchResult.close();
|
||||
|
||||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
|
||||
// create a document
|
||||
Document document = testDocumentWithTextField();
|
||||
|
@ -1002,6 +1003,12 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
||||
// but not real time is not yet visible
|
||||
getResult = engine.get(newGet(false, doc), searcherFactory);
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
getResult.release();
|
||||
|
||||
|
||||
// refresh and it should be there
|
||||
engine.refresh("test");
|
||||
|
||||
|
@ -1237,6 +1244,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertTrue(engine.tryRenewSyncCommit());
|
||||
assertEquals(1, engine.segments(false).size());
|
||||
} else {
|
||||
engine.refresh("test");
|
||||
assertBusy(() -> assertEquals(1, engine.segments(false).size()));
|
||||
}
|
||||
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
|
||||
|
@ -1306,11 +1314,87 @@ public class InternalEngineTests extends ESTestCase {
|
|||
Engine.IndexResult indexResult = engine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(),
|
||||
create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
||||
public void testReplicatedVersioningWithFlush() throws IOException {
|
||||
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
|
||||
Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED);
|
||||
Engine.IndexResult indexResult = engine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
assertTrue(indexResult.isCreated());
|
||||
|
||||
|
||||
create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(),
|
||||
create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
assertTrue(indexResult.isCreated());
|
||||
|
||||
if (randomBoolean()) {
|
||||
engine.flush();
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
replicaEngine.flush();
|
||||
}
|
||||
|
||||
Engine.Index update = new Engine.Index(newUid(doc), doc, 1);
|
||||
Engine.IndexResult updateResult = engine.index(update);
|
||||
assertThat(updateResult.getVersion(), equalTo(2L));
|
||||
assertFalse(updateResult.isCreated());
|
||||
|
||||
|
||||
update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(),
|
||||
update.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
updateResult = replicaEngine.index(update);
|
||||
assertThat(updateResult.getVersion(), equalTo(2L));
|
||||
assertFalse(updateResult.isCreated());
|
||||
replicaEngine.refresh("test");
|
||||
try (Searcher searcher = replicaEngine.acquireSearcher("test")) {
|
||||
assertEquals(1, searcher.getDirectoryReader().numDocs());
|
||||
}
|
||||
|
||||
engine.refresh("test");
|
||||
try (Searcher searcher = engine.acquireSearcher("test")) {
|
||||
assertEquals(1, searcher.getDirectoryReader().numDocs());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* simulates what an upsert / update API does
|
||||
*/
|
||||
public void testVersionedUpdate() throws IOException {
|
||||
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
|
||||
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
|
||||
Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED);
|
||||
Engine.IndexResult indexResult = engine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
|
||||
assertEquals(1, get.version());
|
||||
}
|
||||
|
||||
Engine.Index update_1 = new Engine.Index(newUid(doc), doc, 1);
|
||||
Engine.IndexResult update_1_result = engine.index(update_1);
|
||||
assertThat(update_1_result.getVersion(), equalTo(2L));
|
||||
|
||||
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
|
||||
assertEquals(2, get.version());
|
||||
}
|
||||
|
||||
Engine.Index update_2 = new Engine.Index(newUid(doc), doc, 2);
|
||||
Engine.IndexResult update_2_result = engine.index(update_2);
|
||||
assertThat(update_2_result.getVersion(), equalTo(3L));
|
||||
|
||||
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
|
||||
assertEquals(3, get.version());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testVersioningNewIndex() throws IOException {
|
||||
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
|
||||
Engine.Index index = indexForDoc(doc);
|
||||
|
@ -1337,12 +1421,14 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertEquals(numDocs, test.reader().numDocs());
|
||||
}
|
||||
engine.forceMerge(true, 1, false, false, false);
|
||||
engine.refresh("test");
|
||||
assertEquals(engine.segments(true).size(), 1);
|
||||
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), B_1, null);
|
||||
Engine.Index index = indexForDoc(doc);
|
||||
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
|
||||
engine.forceMerge(true, 10, true, false, false); //expunge deletes
|
||||
engine.refresh("test");
|
||||
|
||||
assertEquals(engine.segments(true).size(), 1);
|
||||
try (Engine.Searcher test = engine.acquireSearcher("test")) {
|
||||
|
@ -1354,7 +1440,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
index = indexForDoc(doc);
|
||||
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
|
||||
engine.forceMerge(true, 10, false, false, false); //expunge deletes
|
||||
|
||||
engine.refresh("test");
|
||||
assertEquals(engine.segments(true).size(), 1);
|
||||
try (Engine.Searcher test = engine.acquireSearcher("test")) {
|
||||
assertEquals(numDocs - 2, test.reader().numDocs());
|
||||
|
@ -1561,6 +1647,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
if (randomBoolean()) {
|
||||
engine.flush();
|
||||
engine.refresh("test");
|
||||
}
|
||||
firstOp = false;
|
||||
}
|
||||
|
@ -1716,11 +1803,12 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
if (randomBoolean()) {
|
||||
engine.flush();
|
||||
engine.refresh("test");
|
||||
}
|
||||
|
||||
if (rarely()) {
|
||||
// simulate GC deletes
|
||||
engine.refresh("gc_simulation");
|
||||
engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL);
|
||||
engine.clearDeletedTombstones();
|
||||
if (docDeleted) {
|
||||
lastOpVersion = Versions.NOT_FOUND;
|
||||
|
@ -1805,6 +1893,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
if (randomBoolean()) {
|
||||
engine.flush();
|
||||
engine.refresh("test");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1884,7 +1973,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
|
||||
final Term uidTerm = newUid(doc);
|
||||
engine.index(indexForDoc(doc));
|
||||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
for (int i = 0; i < thread.length; i++) {
|
||||
thread[i] = new Thread(() -> {
|
||||
startGun.countDown();
|
||||
|
@ -2314,7 +2403,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
|
||||
engine.config().setEnableGcDeletes(false);
|
||||
|
||||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
|
||||
// Add document
|
||||
Document document = testDocument();
|
||||
|
@ -2644,6 +2733,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
if (flush) {
|
||||
engine.flush();
|
||||
engine.refresh("test");
|
||||
}
|
||||
|
||||
doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
|
||||
|
@ -3847,7 +3937,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
|
||||
final Term uid = newUid(doc);
|
||||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
|
||||
for (int i = 0; i < numberOfOperations; i++) {
|
||||
if (randomBoolean()) {
|
||||
final Engine.Index index = new Engine.Index(
|
||||
|
@ -4203,4 +4293,58 @@ public class InternalEngineTests extends ESTestCase {
|
|||
IOUtils.close(recoveringEngine);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void assertSameReader(Searcher left, Searcher right) {
|
||||
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
|
||||
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
|
||||
assertEquals(rightLeaves.size(), leftLeaves.size());
|
||||
for (int i = 0; i < leftLeaves.size(); i++) {
|
||||
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
|
||||
}
|
||||
}
|
||||
|
||||
public void assertNotSameReader(Searcher left, Searcher right) {
|
||||
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
|
||||
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
|
||||
if (rightLeaves.size() == leftLeaves.size()) {
|
||||
for (int i = 0; i < leftLeaves.size(); i++) {
|
||||
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
|
||||
return; // all is well
|
||||
}
|
||||
}
|
||||
fail("readers are same");
|
||||
}
|
||||
}
|
||||
|
||||
public void testRefreshScopedSearcher() throws IOException {
|
||||
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
|
||||
assertSameReader(getSearcher, searchSearcher);
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
final String docId = Integer.toString(i);
|
||||
final ParsedDocument doc =
|
||||
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
|
||||
Engine.Index primaryResponse = indexForDoc(doc);
|
||||
engine.index(primaryResponse);
|
||||
}
|
||||
assertTrue(engine.refreshNeeded());
|
||||
engine.refresh("test", Engine.SearcherScope.INTERNAL);
|
||||
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
|
||||
assertEquals(10, getSearcher.reader().numDocs());
|
||||
assertEquals(0, searchSearcher.reader().numDocs());
|
||||
assertNotSameReader(getSearcher, searchSearcher);
|
||||
}
|
||||
|
||||
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
|
||||
|
||||
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
|
||||
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
|
||||
assertEquals(10, getSearcher.reader().numDocs());
|
||||
assertEquals(10, searchSearcher.reader().numDocs());
|
||||
assertSameReader(getSearcher, searchSearcher);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class UidTests extends ESTestCase {
|
|||
for (int iter = 0; iter < iters; ++iter) {
|
||||
final String id = TestUtil.randomRealisticUnicodeString(random(), 1, 10);
|
||||
BytesRef encoded = Uid.encodeId(id);
|
||||
assertEquals(id, Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length)));
|
||||
assertEquals(id, doDecodeId(encoded));
|
||||
assertTrue(encoded.length <= 1 + new BytesRef(id).length);
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ public class UidTests extends ESTestCase {
|
|||
id = "0" + id;
|
||||
}
|
||||
BytesRef encoded = Uid.encodeId(id);
|
||||
assertEquals(id, Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length)));
|
||||
assertEquals(id, doDecodeId(encoded));
|
||||
assertEquals(1 + (id.length() + 1) / 2, encoded.length);
|
||||
}
|
||||
}
|
||||
|
@ -105,9 +105,26 @@ public class UidTests extends ESTestCase {
|
|||
random().nextBytes(binaryId);
|
||||
final String id = Base64.getUrlEncoder().withoutPadding().encodeToString(binaryId);
|
||||
BytesRef encoded = Uid.encodeId(id);
|
||||
assertEquals(id, Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length)));
|
||||
assertEquals(id, doDecodeId(encoded));
|
||||
assertTrue(encoded.length <= 1 + binaryId.length);
|
||||
}
|
||||
}
|
||||
|
||||
private static String doDecodeId(BytesRef encoded) {
|
||||
|
||||
if (randomBoolean()) {
|
||||
return Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length));
|
||||
} else {
|
||||
if (randomBoolean()) {
|
||||
BytesRef slicedCopy = new BytesRef(randomIntBetween(encoded.length + 1, encoded.length + 100));
|
||||
slicedCopy.offset = randomIntBetween(1, slicedCopy.bytes.length - encoded.length);
|
||||
slicedCopy.length = encoded.length;
|
||||
System.arraycopy(encoded.bytes, encoded.offset, slicedCopy.bytes, slicedCopy.offset, encoded.length);
|
||||
assertArrayEquals(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length),
|
||||
Arrays.copyOfRange(slicedCopy.bytes, slicedCopy.offset, slicedCopy.offset + slicedCopy.length));
|
||||
encoded = slicedCopy;
|
||||
}
|
||||
return Uid.decodeId(encoded.bytes, encoded.offset, encoded.length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1162,7 +1162,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
indexDoc(shard, "test", "test");
|
||||
try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test",
|
||||
new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) {
|
||||
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount + 1));
|
||||
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount));
|
||||
}
|
||||
closeShards(shard);
|
||||
}
|
||||
|
|
|
@ -270,7 +270,6 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
* Uses a bunch of threads to index, wait for refresh, and non-realtime get documents to validate that they are visible after waiting
|
||||
* regardless of what crazy sequence of events causes the refresh listener to fire.
|
||||
*/
|
||||
@TestLogging("_root:debug,org.elasticsearch.index.engine.Engine.DW:trace")
|
||||
public void testLotsOfThreads() throws Exception {
|
||||
int threadCount = between(3, 10);
|
||||
maxListeners = between(1, threadCount * 2);
|
||||
|
|
|
@ -573,6 +573,7 @@ public class IndexStatsIT extends ESIntegTestCase {
|
|||
|
||||
client().admin().indices().prepareFlush().get();
|
||||
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
|
||||
client().admin().indices().prepareRefresh().get();
|
||||
stats = client().admin().indices().prepareStats().setSegments(true).get();
|
||||
|
||||
assertThat(stats.getTotal().getSegments(), notNullValue());
|
||||
|
|
|
@ -83,12 +83,10 @@ public class SimpleNestedIT extends ESIntegTestCase {
|
|||
.endObject()).execute().actionGet();
|
||||
|
||||
waitForRelocation(ClusterHealthStatus.GREEN);
|
||||
// flush, so we fetch it from the index (as see that we filter nested docs)
|
||||
flush();
|
||||
GetResponse getResponse = client().prepareGet("test", "type1", "1").get();
|
||||
assertThat(getResponse.isExists(), equalTo(true));
|
||||
assertThat(getResponse.getSourceAsBytes(), notNullValue());
|
||||
|
||||
refresh();
|
||||
// check the numDocs
|
||||
assertDocumentCount("test", 3);
|
||||
|
||||
|
@ -126,8 +124,7 @@ public class SimpleNestedIT extends ESIntegTestCase {
|
|||
.endArray()
|
||||
.endObject()).execute().actionGet();
|
||||
waitForRelocation(ClusterHealthStatus.GREEN);
|
||||
// flush, so we fetch it from the index (as see that we filter nested docs)
|
||||
flush();
|
||||
refresh();
|
||||
assertDocumentCount("test", 6);
|
||||
|
||||
searchResponse = client().prepareSearch("test").setQuery(nestedQuery("nested1",
|
||||
|
@ -151,8 +148,7 @@ public class SimpleNestedIT extends ESIntegTestCase {
|
|||
DeleteResponse deleteResponse = client().prepareDelete("test", "type1", "2").execute().actionGet();
|
||||
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
|
||||
|
||||
// flush, so we fetch it from the index (as see that we filter nested docs)
|
||||
flush();
|
||||
refresh();
|
||||
assertDocumentCount("test", 3);
|
||||
|
||||
searchResponse = client().prepareSearch("test").setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"), ScoreMode.Avg)).execute().actionGet();
|
||||
|
@ -179,11 +175,10 @@ public class SimpleNestedIT extends ESIntegTestCase {
|
|||
.endArray()
|
||||
.endObject()).execute().actionGet();
|
||||
|
||||
// flush, so we fetch it from the index (as see that we filter nested docs)
|
||||
flush();
|
||||
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
|
||||
assertThat(getResponse.isExists(), equalTo(true));
|
||||
waitForRelocation(ClusterHealthStatus.GREEN);
|
||||
refresh();
|
||||
// check the numDocs
|
||||
assertDocumentCount("test", 7);
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ import java.util.function.Consumer;
|
|||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/** Unit tests for TCPTransport */
|
||||
/** Unit tests for {@link TcpTransport} */
|
||||
public class TcpTransportTests extends ESTestCase {
|
||||
|
||||
/** Test ipv4 host with a default port works */
|
||||
|
@ -191,7 +191,7 @@ public class TcpTransportTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException {
|
||||
protected void closeChannels(List channel, boolean blocking, boolean doNotLinger) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
@ -268,12 +269,10 @@ public class SimpleVersioningIT extends ESIntegTestCase {
|
|||
assertThat(indexResponse.getVersion(), equalTo(1L));
|
||||
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet();
|
||||
assertThat(indexResponse.getVersion(), equalTo(2L));
|
||||
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
|
||||
VersionConflictEngineException.class);
|
||||
|
||||
|
@ -286,13 +285,16 @@ public class SimpleVersioningIT extends ESIntegTestCase {
|
|||
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
|
||||
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
|
||||
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(2L));
|
||||
}
|
||||
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).execute().actionGet();
|
||||
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).
|
||||
execute().actionGet();
|
||||
assertHitCount(searchResponse, 1);
|
||||
assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,11 @@ The last (`node.role`, `master`, and `name`) columns provide ancillary
|
|||
information that can often be useful when looking at the cluster as a whole,
|
||||
particularly large ones. How many master-eligible nodes do I have?
|
||||
|
||||
The `nodes` API accepts an additional URL parameter `full_id` accepting `true`
|
||||
or `false`. The purpose of this parameter is to format the ID field (if
|
||||
requested with `id` or `nodeId`) in its full length or in abbreviated form (the
|
||||
default).
|
||||
|
||||
[float]
|
||||
=== Columns
|
||||
|
||||
|
|
|
@ -98,8 +98,8 @@ parameter in the same way as the search api.
|
|||
So far we've only been updating documents without changing their source. That
|
||||
is genuinely useful for things like
|
||||
<<picking-up-a-new-property,picking up new properties>> but it's only half the
|
||||
fun. `_update_by_query` supports a `script` object to update the document. This
|
||||
will increment the `likes` field on all of kimchy's tweets:
|
||||
fun. `_update_by_query` <<modules-scripting-using,supports scripts>> to update
|
||||
the document. This will increment the `likes` field on all of kimchy's tweets:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
|
|
|
@ -5,7 +5,7 @@ A similarity (scoring / ranking model) defines how matching documents
|
|||
are scored. Similarity is per field, meaning that via the mapping one
|
||||
can define a different similarity per field.
|
||||
|
||||
Configuring a custom similarity is considered a expert feature and the
|
||||
Configuring a custom similarity is considered an expert feature and the
|
||||
builtin similarities are most likely sufficient as is described in
|
||||
<<similarity>>.
|
||||
|
||||
|
|
|
@ -7,8 +7,10 @@ applied when new indices are created. The templates include both
|
|||
and a simple pattern template that controls whether the template should be
|
||||
applied to the new index.
|
||||
|
||||
NOTE: Templates are only applied at index creation time. Changing a template
|
||||
will have no impact on existing indices.
|
||||
NOTE: Templates are only applied at index creation time. Changing a template
|
||||
will have no impact on existing indices. When using the create index API, the
|
||||
settings/mappings defined as part of the create index call will take precedence
|
||||
over any matching settings/mappings defined in the template.
|
||||
|
||||
For example:
|
||||
|
||||
|
|
|
@ -10,3 +10,12 @@ index names may no longer contain `:`.
|
|||
|
||||
Negative values were interpreted as zero in earlier versions but are no
|
||||
longer accepted.
|
||||
|
||||
|
||||
==== `_flush` and `_force_merge` will no longer refresh
|
||||
|
||||
In previous versions issuing a `_flush` or `_force_merge` (with `flush=true`)
|
||||
had the undocumented side-effect of refreshing the index which made new documents
|
||||
visible to searches and non-realtime GET operations. From now on these operations
|
||||
don't have this side-effect anymore. To make documents visible an explicit `_refresh`
|
||||
call is needed unless the index is refreshed by the internal scheduler.
|
||||
|
|
|
@ -49,10 +49,7 @@ GET my_index/_search
|
|||
|
||||
`lang`::
|
||||
|
||||
Specifies the language the script is written in. Defaults to `painless` but
|
||||
may be set to any of languages listed in <<modules-scripting>>. The
|
||||
default language may be changed in the `elasticsearch.yml` config file by
|
||||
setting `script.default_lang` to the appropriate language.
|
||||
Specifies the language the script is written in. Defaults to `painless`.
|
||||
|
||||
|
||||
`source`, `id`::
|
||||
|
@ -108,6 +105,30 @@ minute will be compiled. You can change this setting dynamically by setting
|
|||
|
||||
========================================
|
||||
|
||||
[float]
|
||||
[[modules-scripting-short-script-form]]
|
||||
=== Short Script Form
|
||||
A short script form can be used for brevity. In the short form, `script` is represented
|
||||
by a string instead of an object. This string contains the source of the script.
|
||||
|
||||
Short form:
|
||||
|
||||
[source,js]
|
||||
----------------------
|
||||
"script": "ctx._source.likes++"
|
||||
----------------------
|
||||
// NOTCONSOLE
|
||||
|
||||
The same script in the normal form:
|
||||
|
||||
[source,js]
|
||||
----------------------
|
||||
"script": {
|
||||
"source": "ctx._source.likes++"
|
||||
}
|
||||
----------------------
|
||||
// NOTCONSOLE
|
||||
|
||||
[float]
|
||||
[[modules-scripting-stored-scripts]]
|
||||
=== Stored Scripts
|
||||
|
|
|
@ -593,10 +593,9 @@ public class ChildQuerySearchIT extends ParentChildTestCase {
|
|||
|
||||
createIndexRequest("test", "parent", "1", null, "p_field", 1).get();
|
||||
createIndexRequest("test", "child", "2", "1", "c_field", 1).get();
|
||||
client().admin().indices().prepareFlush("test").get();
|
||||
|
||||
client().prepareIndex("test", legacy() ? "type1" : "doc", "3").setSource("p_field", 1).get();
|
||||
client().admin().indices().prepareFlush("test").get();
|
||||
refresh();
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch("test")
|
||||
.setQuery(boolQuery().must(matchAllQuery()).filter(hasChildQuery("child", matchAllQuery(), ScoreMode.None))).get();
|
||||
|
@ -881,8 +880,8 @@ public class ChildQuerySearchIT extends ParentChildTestCase {
|
|||
} else {
|
||||
client().prepareIndex("test", "doc", "3").setSource("p_field", 2).get();
|
||||
}
|
||||
client().admin().indices().prepareFlush("test").get();
|
||||
|
||||
refresh();
|
||||
SearchResponse searchResponse = client().prepareSearch("test")
|
||||
.setQuery(boolQuery().must(matchAllQuery()).filter(hasChildQuery("child", termQuery("c_field", 1), ScoreMode.None)))
|
||||
.get();
|
||||
|
@ -911,7 +910,7 @@ public class ChildQuerySearchIT extends ParentChildTestCase {
|
|||
|
||||
createIndexRequest("test", "parent", "1", null, "p_field", 1).get();
|
||||
createIndexRequest("test", "child", "2", "1", "c_field", "foo bar").get();
|
||||
client().admin().indices().prepareFlush("test").get();
|
||||
refresh();
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch("test").setQuery(
|
||||
hasChildQuery("child", matchQuery("c_field", "foo"), ScoreMode.None)
|
||||
|
|
|
@ -331,8 +331,8 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void closeChannels(final List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException {
|
||||
if (closingTransport) {
|
||||
protected void closeChannels(final List<Channel> channels, boolean blocking, boolean doNotLinger) throws IOException {
|
||||
if (doNotLinger) {
|
||||
for (Channel channel : channels) {
|
||||
/* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't have a gazillion connections sitting
|
||||
* in TIME_WAIT to free up resources quickly. This is really the only part where we close the connection from the server
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
body: {"f1": "v6_mixed", "f2": 10}
|
||||
|
||||
- do:
|
||||
indices.flush:
|
||||
indices.refresh:
|
||||
index: test_index
|
||||
|
||||
- do:
|
||||
|
@ -56,7 +56,7 @@
|
|||
id: d10
|
||||
|
||||
- do:
|
||||
indices.flush:
|
||||
indices.refresh:
|
||||
index: test_index
|
||||
|
||||
- do:
|
||||
|
|
|
@ -46,7 +46,7 @@
|
|||
- '{"f1": "d_old"}'
|
||||
|
||||
- do:
|
||||
indices.flush:
|
||||
indices.refresh:
|
||||
index: test_index,index_with_replicas
|
||||
|
||||
- do:
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
- '{"f1": "v5_upgraded", "f2": 14}'
|
||||
|
||||
- do:
|
||||
indices.flush:
|
||||
indices.refresh:
|
||||
index: test_index
|
||||
|
||||
- do:
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
"parts": {
|
||||
"repository": {
|
||||
"type" : "list",
|
||||
"required": true,
|
||||
"description": "Name of repository from which to fetch the snapshot information"
|
||||
}
|
||||
},
|
||||
|
|
|
@ -10,11 +10,6 @@
|
|||
"type" : "string",
|
||||
"description" : "Script ID",
|
||||
"required" : true
|
||||
},
|
||||
"lang" : {
|
||||
"type" : "string",
|
||||
"description" : "Script language",
|
||||
"required" : true
|
||||
}
|
||||
},
|
||||
"params" : {
|
||||
|
|
|
@ -10,11 +10,6 @@
|
|||
"type" : "string",
|
||||
"description" : "Script ID",
|
||||
"required" : true
|
||||
},
|
||||
"lang" : {
|
||||
"type" : "string",
|
||||
"description" : "Script language",
|
||||
"required" : true
|
||||
}
|
||||
},
|
||||
"params" : {
|
||||
|
|
|
@ -11,10 +11,9 @@
|
|||
"description" : "Script ID",
|
||||
"required" : true
|
||||
},
|
||||
"lang" : {
|
||||
"context" : {
|
||||
"type" : "string",
|
||||
"description" : "Script language",
|
||||
"required" : true
|
||||
"description" : "Script context"
|
||||
}
|
||||
},
|
||||
"params" : {
|
||||
|
|
|
@ -243,8 +243,8 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void closeChannels(List<MockChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
|
||||
if (closingTransport) {
|
||||
protected void closeChannels(List<MockChannel> channels, boolean blocking, boolean doNotLinger) throws IOException {
|
||||
if (doNotLinger) {
|
||||
for (MockChannel channel : channels) {
|
||||
if (channel.activeChannel != null) {
|
||||
/* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't have a gazillion connections sitting
|
||||
|
|
|
@ -99,15 +99,15 @@ public class NioTransport extends TcpTransport<NioChannel> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void closeChannels(List<NioChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
|
||||
if (closingTransport) {
|
||||
protected void closeChannels(List<NioChannel> channels, boolean blocking, boolean doNotLinger) throws IOException {
|
||||
if (doNotLinger) {
|
||||
for (NioChannel channel : channels) {
|
||||
/* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't have a gazillion connections sitting
|
||||
* in TIME_WAIT to free up resources quickly. This is really the only part where we close the connection from the server
|
||||
* side otherwise the client (node) initiates the TCP closing sequence which doesn't cause these issues. Setting this
|
||||
* by default from the beginning can have unexpected side-effects an should be avoided, our protocol is designed
|
||||
* in a way that clients close connection which is how it should be*/
|
||||
if (channel.isOpen()) {
|
||||
if (channel.isOpen() && channel.getRawChannel().supportedOptions().contains(StandardSocketOptions.SO_LINGER)) {
|
||||
channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue