Indexing: clear versionMap on refresh (not flush) to reduce heap usage

The versionMap holds all versions (keyed by _uid) for recently indexed
documents.  Previously we only cleared it during flush, which can be
infrequent if the translog flush thresholds are high, and can cause
excessive heap usage especially for small documents.

Now we clear it during refresh which is usually more frequent
(e.g. once per second by default).

Closes #6379
This commit is contained in:
mikemccand 2014-06-04 05:37:51 -04:00
parent f78480a0bc
commit 50e42265ef
4 changed files with 268 additions and 71 deletions

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
import org.elasticsearch.index.translog.Translog;
/** Holds a deleted version, which just adds a timestmap to {@link VersionValue} so we know when we can expire the deletion. */
class DeleteVersionValue extends VersionValue {
private final long time;
public DeleteVersionValue(long version, long time, Translog.Location translogLocation) {
super(version, translogLocation);
this.time = time;
}
@Override
public long time() {
return this.time;
}
@Override
public boolean delete() {
return true;
}
}

View File

@ -54,7 +54,6 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.HashedBytesRef;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.SegmentReaderUtils;
@ -150,7 +149,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
// A uid (in the form of BytesRef) to the version map
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
private final ConcurrentMap<HashedBytesRef, VersionValue> versionMap;
private final LiveVersionMap versionMap;
private final Object[] dirtyLocks;
@ -198,7 +197,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
this.codecService = codecService;
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.versionMap = new LiveVersionMap();
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
@ -289,6 +288,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
translog.newTranslog(translogIdGenerator.get());
this.searcherManager = buildSearchManager(indexWriter);
versionMap.setManager(searcherManager);
readLastCommittedSegmentsInfo();
} catch (IOException e) {
try {
@ -322,7 +322,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
public GetResult get(Get get) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
if (get.realtime()) {
VersionValue versionValue = versionMap.get(versionKey(get.uid()));
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
if (versionValue != null) {
if (versionValue.delete()) {
return GetResult.NOT_EXISTS;
@ -402,14 +402,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private void innerCreate(Create create, IndexWriter writer) throws IOException {
synchronized (dirtyLock(create.uid())) {
HashedBytesRef versionKey = versionKey(create.uid());
final long currentVersion;
final VersionValue versionValue;
if (optimizeAutoGenerateId && create.autoGeneratedId() && !create.canHaveDuplicates()) {
currentVersion = Versions.NOT_FOUND;
versionValue = null;
} else {
versionValue = versionMap.get(versionKey);
versionValue = versionMap.getUnderLock(create.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(create.uid());
} else {
@ -433,7 +432,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
// if the doc does not exists or it exists but not delete
// if the doc does not exist or it exists but is not deleted
if (versionValue != null) {
if (!versionValue.delete()) {
if (create.origin() == Operation.Origin.RECOVERY) {
@ -460,7 +459,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
Translog.Location translogLocation = translog.add(new Translog.Create(create));
versionMap.put(versionKey, new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
indexingService.postCreateUnderLock(create);
}
@ -487,9 +486,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private void innerIndex(Index index, IndexWriter writer) throws IOException {
synchronized (dirtyLock(index.uid())) {
HashedBytesRef versionKey = versionKey(index.uid());
final long currentVersion;
VersionValue versionValue = versionMap.get(versionKey);
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
@ -533,7 +531,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.put(versionKey, new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
indexingService.postIndexUnderLock(index);
}
@ -559,8 +557,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
synchronized (dirtyLock(delete.uid())) {
final long currentVersion;
HashedBytesRef versionKey = versionKey(delete.uid());
VersionValue versionValue = versionMap.get(versionKey);
VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(delete.uid());
} else {
@ -582,23 +579,23 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
final boolean found;
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exists and no prior deletes
delete.updateVersion(updatedVersion, false);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
// doc does not exist and no prior deletes
found = false;
} else if (versionValue != null && versionValue.delete()) {
// a "delete on delete", in this case, we still increment the version, log it, and return that version
delete.updateVersion(updatedVersion, false);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
found = false;
} else {
delete.updateVersion(updatedVersion, true);
// we deleted a currently existing document
writer.deleteDocuments(delete.uid());
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
found = true;
}
delete.updateVersion(updatedVersion, found);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation));
indexingService.postDeleteUnderLock(delete);
}
}
@ -632,7 +629,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
throw new DeleteByQueryFailedEngineException(shardId, delete, t);
}
//TODO: This is heavy, since we refresh, but we really have to...
refreshVersioningTable(System.currentTimeMillis());
pruneDeletedVersions(System.currentTimeMillis());
}
@Override
@ -771,12 +768,14 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
SearcherManager current = this.searcherManager;
this.searcherManager = buildSearchManager(indexWriter);
versionMap.setManager(searcherManager);
try {
IOUtils.close(current);
} catch (Throwable t) {
logger.warn("Failed to close current SearcherManager", t);
}
refreshVersioningTable(threadPool.estimatedTimeInMillis());
pruneDeletedVersions(threadPool.estimatedTimeInMillis());
} catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t);
}
@ -795,7 +794,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
translog.newTransientTranslog(translogId);
indexWriter.setCommitData(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
indexWriter.commit();
refreshVersioningTable(threadPool.estimatedTimeInMillis());
pruneDeletedVersions(threadPool.estimatedTimeInMillis());
// we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get
// when tans overrides it
@ -865,25 +864,29 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
return writer;
}
private void refreshVersioningTable(long time) {
private void pruneDeletedVersions(long time) {
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table").force(true));
for (Map.Entry<HashedBytesRef, VersionValue> entry : versionMap.entrySet()) {
HashedBytesRef uid = entry.getKey();
synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
VersionValue versionValue = versionMap.get(uid);
// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...
// we only need to prune deletes; the adds/updates are cleared whenever reader is refreshed:
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllDeletes()) {
BytesRef uid = entry.getKey();
synchronized (dirtyLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getDeleteUnderLock(uid);
if (versionValue == null) {
// another thread has re-added this uid since we started refreshing:
continue;
}
if (time - versionValue.time() <= 0) {
continue; // its a newer value, from after/during we refreshed, don't clear it
}
if (versionValue.delete()) {
assert versionValue.delete();
if (enableGcDeletes && (time - versionValue.time()) > gcDeletesInMillis) {
versionMap.remove(uid);
}
} else {
versionMap.remove(uid);
versionMap.removeDeleteUnderLock(uid);
}
}
}
@ -1204,10 +1207,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
}
private HashedBytesRef versionKey(Term uid) {
return new HashedBytesRef(uid.bytes());
}
private Object dirtyLock(BytesRef uid) {
int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length);
return dirtyLocks[MathUtils.mod(hash, dirtyLocks.length)];
@ -1415,36 +1414,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
}
static class VersionValue {
private final long version;
private final boolean delete;
private final long time;
private final Translog.Location translogLocation;
VersionValue(long version, boolean delete, long time, Translog.Location translogLocation) {
this.version = version;
this.delete = delete;
this.time = time;
this.translogLocation = translogLocation;
}
public long time() {
return this.time;
}
public long version() {
return version;
}
public boolean delete() {
return delete;
}
public Translog.Location translogLocation() {
return this.translogLocation;
}
}
class SearchFactory extends SearcherFactory {
@Override

View File

@ -0,0 +1,137 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
// TODO: use Lucene's LiveFieldValues, but we need to somehow extend it to handle SearcherManager changing, and to handle long-lasting (GC'd
// by time) tombstones
/** Maps _uid value to its version information. */
class LiveVersionMap implements ReferenceManager.RefreshListener {
// All writes go into here:
private volatile Map<BytesRef,VersionValue> addsCurrent = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
// Only used while refresh is running:
private volatile Map<BytesRef,VersionValue> addsOld = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
// Holds tombstones for deleted docs, expiring by their own schedule; not private so InternalEngine can prune:
private final Map<BytesRef,VersionValue> deletes = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private ReferenceManager mgr;
public void setManager(ReferenceManager newMgr) {
if (mgr != null) {
mgr.removeListener(this);
}
mgr = newMgr;
// So we are notified when reopen starts and finishes
mgr.addListener(this);
}
@Override
public void beforeRefresh() throws IOException {
addsOld = addsCurrent;
// Start sending all updates after this point to the new
// map. While reopen is running, any lookup will first
// try this new map, then fallback to old, then to the
// current searcher:
addsCurrent = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
}
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
// Now drop all the old values because they are now
// visible via the searcher that was just opened; if
// didRefresh is false, it's possible old has some
// entries in it, which is fine: it means they were
// actually already included in the previously opened
// reader. So we can safely clear old here:
addsOld = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
}
/** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */
public VersionValue getUnderLock(BytesRef uid) {
// First try to get the "live" value:
VersionValue value = addsCurrent.get(uid);
if (value != null) {
return value;
}
value = addsOld.get(uid);
if (value != null) {
return value;
}
value = deletes.get(uid);
if (value != null) {
return value;
}
return null;
}
/** Adds this uid/version to the pending adds map. */
public void putUnderLock(BytesRef uid, VersionValue version) {
deletes.remove(uid);
addsCurrent.put(uid, version);
}
/** Adds this uid/version to the pending deletes map. */
public void putDeleteUnderLock(BytesRef uid, VersionValue version) {
addsCurrent.remove(uid);
addsOld.remove(uid);
deletes.put(uid, version);
}
/** Returns the current deleted version for this uid. */
public VersionValue getDeleteUnderLock(BytesRef uid) {
return deletes.get(uid);
}
/** Removes this uid from the pending deletes map. */
public void removeDeleteUnderLock(BytesRef uid) {
deletes.remove(uid);
}
/** Iterates over all pending deletions. */
public Iterable<Map.Entry<BytesRef,VersionValue>> getAllDeletes() {
return deletes.entrySet();
}
/** Called when this index is closed. */
public void clear() {
addsCurrent.clear();
addsOld.clear();
deletes.clear();
if (mgr != null) {
mgr.removeListener(this);
mgr = null;
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
import org.elasticsearch.index.translog.Translog;
class VersionValue {
private final long version;
private final Translog.Location translogLocation;
public VersionValue(long version, Translog.Location translogLocation) {
this.version = version;
this.translogLocation = translogLocation;
}
public long time() {
throw new UnsupportedOperationException();
}
public long version() {
return version;
}
public boolean delete() {
return false;
}
public Translog.Location translogLocation() {
return this.translogLocation;
}
}