use bytes instead of String as key in versionMap
no need to create a String every time we put or get a value from the version map
This commit is contained in:
parent
15fcb17a81
commit
1c7d2442c8
|
@ -37,6 +37,17 @@ public class DjbHashFunction implements HashFunction {
|
|||
return (int) hash;
|
||||
}
|
||||
|
||||
public static int DJB_HASH(byte[] value, int offset, int length) {
|
||||
long hash = 5381;
|
||||
|
||||
final int end = offset + length;
|
||||
for (int i = offset; i < end; i++) {
|
||||
hash = ((hash << 5) + hash) + value[i];
|
||||
}
|
||||
|
||||
return (int) hash;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hash(String routing) {
|
||||
return DJB_HASH(routing);
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.lucene.search.DocIdSetIterator;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.search.Query;
|
|||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
|
||||
|
@ -33,6 +34,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.Preconditions;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.HashedBytesRef;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.search.XFilteredQuery;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
|
@ -131,7 +133,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
private volatile int onGoingRecoveries = 0;
|
||||
|
||||
|
||||
private final ConcurrentMap<String, VersionValue> versionMap;
|
||||
// A uid (in the form of BytesRef) to the version map
|
||||
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
|
||||
private final ConcurrentMap<HashedBytesRef, VersionValue> versionMap;
|
||||
|
||||
private final Object[] dirtyLocks;
|
||||
|
||||
|
@ -305,7 +309,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
rwl.readLock().lock();
|
||||
try {
|
||||
if (get.realtime()) {
|
||||
VersionValue versionValue = versionMap.get(get.uid().text());
|
||||
VersionValue versionValue = versionMap.get(versionKey(get.uid()));
|
||||
if (versionValue != null) {
|
||||
if (versionValue.delete()) {
|
||||
return GetResult.NOT_EXISTS;
|
||||
|
@ -381,7 +385,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
synchronized (dirtyLock(create.uid())) {
|
||||
UidField uidField = create.uidField();
|
||||
final long currentVersion;
|
||||
VersionValue versionValue = versionMap.get(create.uid().text());
|
||||
VersionValue versionValue = versionMap.get(versionKey(create.uid()));
|
||||
if (versionValue == null) {
|
||||
currentVersion = loadCurrentVersionFromIndex(create.uid());
|
||||
} else {
|
||||
|
@ -467,7 +471,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
}
|
||||
Translog.Location translogLocation = translog.add(new Translog.Create(create));
|
||||
|
||||
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
versionMap.put(versionKey(create.uid()), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
|
||||
indexingService.postCreateUnderLock(create);
|
||||
}
|
||||
|
@ -505,7 +509,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
synchronized (dirtyLock(index.uid())) {
|
||||
UidField uidField = index.uidField();
|
||||
final long currentVersion;
|
||||
VersionValue versionValue = versionMap.get(index.uid().text());
|
||||
VersionValue versionValue = versionMap.get(versionKey(index.uid()));
|
||||
if (versionValue == null) {
|
||||
currentVersion = loadCurrentVersionFromIndex(index.uid());
|
||||
} else {
|
||||
|
@ -581,7 +585,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
}
|
||||
Translog.Location translogLocation = translog.add(new Translog.Index(index));
|
||||
|
||||
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
versionMap.put(versionKey(index.uid()), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
|
||||
indexingService.postIndexUnderLock(index);
|
||||
}
|
||||
|
@ -617,7 +621,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
|
||||
synchronized (dirtyLock(delete.uid())) {
|
||||
final long currentVersion;
|
||||
VersionValue versionValue = versionMap.get(delete.uid().text());
|
||||
VersionValue versionValue = versionMap.get(versionKey(delete.uid()));
|
||||
if (versionValue == null) {
|
||||
currentVersion = loadCurrentVersionFromIndex(delete.uid());
|
||||
} else {
|
||||
|
@ -674,17 +678,17 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
// doc does not exists and no prior deletes
|
||||
delete.version(updatedVersion).notFound(true);
|
||||
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
|
||||
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
versionMap.put(versionKey(delete.uid()), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
} else if (versionValue != null && versionValue.delete()) {
|
||||
// a "delete on delete", in this case, we still increment the version, log it, and return that version
|
||||
delete.version(updatedVersion).notFound(true);
|
||||
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
|
||||
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
versionMap.put(versionKey(delete.uid()), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
} else {
|
||||
delete.version(updatedVersion);
|
||||
writer.deleteDocuments(delete.uid());
|
||||
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
|
||||
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
versionMap.put(versionKey(delete.uid()), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
|
||||
}
|
||||
|
||||
indexingService.postDeleteUnderLock(delete);
|
||||
|
@ -726,7 +730,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Searcher searcher() throws EngineException{
|
||||
public Searcher searcher() throws EngineException {
|
||||
SearcherManager manager = this.searcherManager;
|
||||
try {
|
||||
IndexSearcher searcher = manager.acquire();
|
||||
|
@ -954,10 +958,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
private void refreshVersioningTable(long time) {
|
||||
// we need to refresh in order to clear older version values
|
||||
refresh(new Refresh(true).force(true));
|
||||
for (Map.Entry<String, VersionValue> entry : versionMap.entrySet()) {
|
||||
String id = entry.getKey();
|
||||
synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
|
||||
VersionValue versionValue = versionMap.get(id);
|
||||
for (Map.Entry<HashedBytesRef, VersionValue> entry : versionMap.entrySet()) {
|
||||
HashedBytesRef uid = entry.getKey();
|
||||
synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
|
||||
VersionValue versionValue = versionMap.get(uid);
|
||||
if (versionValue == null) {
|
||||
continue;
|
||||
}
|
||||
|
@ -966,10 +970,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
}
|
||||
if (versionValue.delete()) {
|
||||
if (enableGcDeletes && (time - versionValue.time()) > gcDeletesInMillis) {
|
||||
versionMap.remove(id);
|
||||
versionMap.remove(uid);
|
||||
}
|
||||
} else {
|
||||
versionMap.remove(id);
|
||||
versionMap.remove(uid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1290,8 +1294,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private Object dirtyLock(String id) {
|
||||
int hash = DjbHashFunction.DJB_HASH(id);
|
||||
private HashedBytesRef versionKey(Term uid) {
|
||||
return new HashedBytesRef(uid.bytes());
|
||||
}
|
||||
|
||||
private Object dirtyLock(BytesRef uid) {
|
||||
int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length);
|
||||
// abs returns Integer.MIN_VALUE, so we need to protect against it...
|
||||
if (hash == Integer.MIN_VALUE) {
|
||||
hash = 0;
|
||||
|
@ -1300,7 +1308,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
}
|
||||
|
||||
private Object dirtyLock(Term uid) {
|
||||
return dirtyLock(uid.text());
|
||||
return dirtyLock(uid.bytes());
|
||||
}
|
||||
|
||||
private long loadCurrentVersionFromIndex(Term uid) {
|
||||
|
|
Loading…
Reference in New Issue