mirror of https://github.com/apache/lucene.git
SOLR-7110: Optimize JavaBinCodec to minimize string Object creation
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1673149 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
69af3beaed
commit
185f8ea2c6
|
@ -437,6 +437,8 @@ Optimizations
|
|||
* SOLR-7239: improved performance of min & max in StatsComponent, as well as situations
|
||||
where local params disable all stats (hossman)
|
||||
|
||||
* SOLR-7110: Optimize JavaBinCodec to minimize string Object creation (Noble Paul)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.apache.solr.util;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.common.util.Cache;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -40,7 +41,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
*
|
||||
* @since solr 1.6
|
||||
*/
|
||||
public class ConcurrentLFUCache<K, V> {
|
||||
public class ConcurrentLFUCache<K, V> implements Cache<K,V> {
|
||||
private static Logger log = LoggerFactory.getLogger(ConcurrentLFUCache.class);
|
||||
|
||||
private final ConcurrentHashMap<Object, CacheEntry<K, V>> map;
|
||||
|
@ -84,6 +85,7 @@ public class ConcurrentLFUCache<K, V> {
|
|||
islive = live;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(K key) {
|
||||
CacheEntry<K, V> e = map.get(key);
|
||||
if (e == null) {
|
||||
|
@ -97,6 +99,7 @@ public class ConcurrentLFUCache<K, V> {
|
|||
return e.value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
CacheEntry<K, V> cacheEntry = map.remove(key);
|
||||
if (cacheEntry != null) {
|
||||
|
@ -106,6 +109,7 @@ public class ConcurrentLFUCache<K, V> {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V put(K key, V val) {
|
||||
if (val == null) return null;
|
||||
CacheEntry<K, V> e = new CacheEntry<>(key, val, stats.accessCounter.incrementAndGet());
|
||||
|
@ -305,6 +309,7 @@ public class ConcurrentLFUCache<K, V> {
|
|||
return stats.size.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
map.clear();
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.apache.solr.util;
|
|||
*/
|
||||
|
||||
import org.apache.lucene.util.PriorityQueue;
|
||||
import org.apache.solr.common.util.Cache;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -43,7 +44,7 @@ import java.lang.ref.WeakReference;
|
|||
*
|
||||
* @since solr 1.4
|
||||
*/
|
||||
public class ConcurrentLRUCache<K,V> {
|
||||
public class ConcurrentLRUCache<K,V> implements Cache<K,V> {
|
||||
private static Logger log = LoggerFactory.getLogger(ConcurrentLRUCache.class);
|
||||
|
||||
private final ConcurrentHashMap<Object, CacheEntry<K,V>> map;
|
||||
|
@ -85,6 +86,7 @@ public class ConcurrentLRUCache<K,V> {
|
|||
islive = live;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(K key) {
|
||||
CacheEntry<K,V> e = map.get(key);
|
||||
if (e == null) {
|
||||
|
@ -95,6 +97,7 @@ public class ConcurrentLRUCache<K,V> {
|
|||
return e.value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
CacheEntry<K,V> cacheEntry = map.remove(key);
|
||||
if (cacheEntry != null) {
|
||||
|
@ -104,6 +107,7 @@ public class ConcurrentLRUCache<K,V> {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V put(K key, V val) {
|
||||
if (val == null) return null;
|
||||
CacheEntry<K,V> e = new CacheEntry<>(key, val, stats.accessCounter.incrementAndGet());
|
||||
|
@ -468,6 +472,7 @@ public class ConcurrentLRUCache<K,V> {
|
|||
return stats.size.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
map.clear();
|
||||
}
|
||||
|
@ -623,8 +628,8 @@ public class ConcurrentLRUCache<K,V> {
|
|||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
if(!isDestroyed){
|
||||
log.error("ConcurrentLRUCache was not destroyed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
|
||||
if(!isDestroyed && (cleanupThread != null)){
|
||||
log.error("ConcurrentLRUCache created with a thread and was not destroyed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
|
||||
destroy();
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -31,7 +31,14 @@ import java.io.Reader;
|
|||
*/
|
||||
public class BinaryResponseParser extends ResponseParser {
|
||||
public static final String BINARY_CONTENT_TYPE = "application/octet-stream";
|
||||
|
||||
|
||||
private JavaBinCodec.StringCache stringCache;
|
||||
|
||||
public BinaryResponseParser setStringCache(JavaBinCodec.StringCache cache) {
|
||||
this.stringCache = cache;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriterType() {
|
||||
return "javabin";
|
||||
|
@ -40,7 +47,7 @@ public class BinaryResponseParser extends ResponseParser {
|
|||
@Override
|
||||
public NamedList<Object> processResponse(InputStream body, String encoding) {
|
||||
try {
|
||||
return (NamedList<Object>) new JavaBinCodec().unmarshal(body);
|
||||
return (NamedList<Object>) new JavaBinCodec(null,stringCache).unmarshal(body);
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "parsing error", e);
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
package org.apache.solr.common.util;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public interface Cache<K, V> {
|
||||
public V put(K key, V val);
|
||||
|
||||
public V get(K key);
|
||||
|
||||
public V remove(K key);
|
||||
|
||||
public void clear();
|
||||
|
||||
}
|
|
@ -81,12 +81,18 @@ public class JavaBinCodec {
|
|||
private static byte VERSION = 2;
|
||||
private ObjectResolver resolver;
|
||||
protected FastOutputStream daos;
|
||||
private StringCache stringCache;
|
||||
|
||||
public JavaBinCodec() {
|
||||
}
|
||||
|
||||
public JavaBinCodec(ObjectResolver resolver) {
|
||||
this(resolver, null);
|
||||
}
|
||||
|
||||
public JavaBinCodec(ObjectResolver resolver, StringCache stringCache) {
|
||||
this.resolver = resolver;
|
||||
this.stringCache = stringCache;
|
||||
}
|
||||
|
||||
public void marshal(Object nl, OutputStream os) throws IOException {
|
||||
|
@ -588,15 +594,23 @@ public class JavaBinCodec {
|
|||
|
||||
byte[] bytes;
|
||||
CharArr arr = new CharArr();
|
||||
private StringBytes bytesRef = new StringBytes(bytes,0,0);
|
||||
|
||||
public String readStr(DataInputInputStream dis) throws IOException {
|
||||
return readStr(dis,null);
|
||||
}
|
||||
|
||||
public String readStr(DataInputInputStream dis, StringCache stringCache) throws IOException {
|
||||
int sz = readSize(dis);
|
||||
if (bytes == null || bytes.length < sz) bytes = new byte[sz];
|
||||
dis.readFully(bytes, 0, sz);
|
||||
|
||||
arr.reset();
|
||||
ByteUtils.UTF8toUTF16(bytes, 0, sz, arr);
|
||||
return arr.toString();
|
||||
if (stringCache != null) {
|
||||
return stringCache.get(bytesRef.reset(bytes, 0, sz));
|
||||
}else {
|
||||
arr.reset();
|
||||
ByteUtils.UTF8toUTF16(bytes, 0, sz, arr);
|
||||
return arr.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public void writeInt(int val) throws IOException {
|
||||
|
@ -804,7 +818,8 @@ public class JavaBinCodec {
|
|||
if (idx != 0) {// idx != 0 is the index of the extern string
|
||||
return stringsList.get(idx - 1);
|
||||
} else {// idx == 0 means it has a string value
|
||||
String s = (String) readVal(fis);
|
||||
tagByte = fis.readByte();
|
||||
String s = readStr(fis, stringCache);
|
||||
if (stringsList == null) stringsList = new ArrayList<>();
|
||||
stringsList.add(s);
|
||||
return s;
|
||||
|
@ -816,5 +831,77 @@ public class JavaBinCodec {
|
|||
public Object resolve(Object o, JavaBinCodec codec) throws IOException;
|
||||
}
|
||||
|
||||
public static class StringCache {
|
||||
private final Cache<StringBytes,String> cache ;
|
||||
|
||||
public StringCache(Cache<StringBytes, String> cache) {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
public String get(StringBytes b){
|
||||
String result = cache.get(b);
|
||||
if(result== null){
|
||||
//make a copy because the buffer received may be changed later by the caller
|
||||
StringBytes copy = new StringBytes(Arrays.copyOfRange(b.bytes, b.offset, b.offset + b.length), 0,b.length);
|
||||
CharArr arr = new CharArr();
|
||||
ByteUtils.UTF8toUTF16(b.bytes, b.offset, b.length, arr);
|
||||
result = arr.toString();
|
||||
cache.put(copy,result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
public static class StringBytes {
|
||||
byte[] bytes;
|
||||
|
||||
/** Offset of first valid byte. */
|
||||
int offset;
|
||||
|
||||
/** Length of used bytes. */
|
||||
private int length;
|
||||
private int hash;
|
||||
public StringBytes(byte[] bytes, int offset, int length) {
|
||||
reset(bytes,offset,length);
|
||||
}
|
||||
StringBytes reset(byte[] bytes, int offset, int length){
|
||||
this.bytes = bytes;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
hash = bytes == null ? 0 : Hash.murmurhash3_x86_32(bytes, offset, length, 0);
|
||||
return this;
|
||||
}
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other instanceof StringBytes) {
|
||||
return this.bytesEquals((StringBytes) other);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean bytesEquals(StringBytes other) {
|
||||
assert other != null;
|
||||
if (length == other.length) {
|
||||
int otherUpto = other.offset;
|
||||
final byte[] otherBytes = other.bytes;
|
||||
final int end = offset + length;
|
||||
for(int upto=offset;upto<end;upto++,otherUpto++) {
|
||||
if (bytes[upto] != otherBytes[otherUpto]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return hash;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -39,7 +39,10 @@ import org.apache.solr.common.SolrDocument;
|
|||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.util.ConcurrentLRUCache;
|
||||
import org.junit.Test;
|
||||
import org.noggit.CharArr;
|
||||
|
||||
public class TestJavaBinCodec extends SolrTestCaseJ4 {
|
||||
|
||||
|
@ -300,6 +303,61 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
|
|||
assertFalse(grandChildDocuments.get(0).hasChildDocuments());
|
||||
assertNull(grandChildDocuments.get(0).getChildDocuments());
|
||||
}
|
||||
@Test
|
||||
public void testStringCaching() throws Exception {
|
||||
Map<String, Object> m = ZkNodeProps.makeMap("key1", "val1", "key2", "val2");
|
||||
|
||||
ByteArrayOutputStream os1 = new ByteArrayOutputStream();
|
||||
new JavaBinCodec().marshal(m, os1);
|
||||
Map m1 = (Map) new JavaBinCodec().unmarshal(new ByteArrayInputStream(os1.toByteArray()));
|
||||
ByteArrayOutputStream os2 = new ByteArrayOutputStream();
|
||||
new JavaBinCodec().marshal(m, os2);
|
||||
Map m2 = (Map) new JavaBinCodec().unmarshal(new ByteArrayInputStream(os2.toByteArray()));
|
||||
List l1 = new ArrayList<>(m1.keySet());
|
||||
List l2 = new ArrayList<>(m2.keySet());
|
||||
|
||||
assertTrue(l1.get(0).equals(l2.get(0)));
|
||||
assertFalse(l1.get(0) == l2.get(0));
|
||||
assertTrue(l1.get(1).equals(l2.get(1)));
|
||||
assertFalse(l1.get(1) == l2.get(1));
|
||||
|
||||
JavaBinCodec.StringCache stringCache = new JavaBinCodec.StringCache(new Cache<JavaBinCodec.StringBytes, String>() {
|
||||
private HashMap<JavaBinCodec.StringBytes, String> cache = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public String put(JavaBinCodec.StringBytes key, String val) {
|
||||
return cache.put(key, val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(JavaBinCodec.StringBytes key) {
|
||||
return cache.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String remove(JavaBinCodec.StringBytes key) {
|
||||
return cache.remove(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
cache.clear();
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
m1 = (Map) new JavaBinCodec(null, stringCache).unmarshal(new ByteArrayInputStream(os1.toByteArray()));
|
||||
m2 = (Map) new JavaBinCodec(null, stringCache).unmarshal(new ByteArrayInputStream(os2.toByteArray()));
|
||||
l1 = new ArrayList<>(m1.keySet());
|
||||
l2 = new ArrayList<>(m2.keySet());
|
||||
assertTrue(l1.get(0).equals(l2.get(0)));
|
||||
assertTrue(l1.get(0) == l2.get(0));
|
||||
assertTrue(l1.get(1).equals(l2.get(1)));
|
||||
assertTrue(l1.get(1) == l2.get(1));
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void genBinaryFiles() throws IOException {
|
||||
JavaBinCodec javabin = new JavaBinCodec();
|
||||
|
@ -326,6 +384,122 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
|
|||
|
||||
}
|
||||
|
||||
private void testPerf() throws InterruptedException {
|
||||
final ArrayList<JavaBinCodec.StringBytes> l = new ArrayList<>();
|
||||
Cache<JavaBinCodec.StringBytes, String> cache = null;
|
||||
/* cache = new ConcurrentLRUCache<JavaBinCodec.StringBytes,String>(10000, 9000, 10000, 1000, false, true, null){
|
||||
@Override
|
||||
public String put(JavaBinCodec.StringBytes key, String val) {
|
||||
l.add(key);
|
||||
return super.put(key, val);
|
||||
}
|
||||
};*/
|
||||
Runtime.getRuntime().gc();
|
||||
printMem("before cache init");
|
||||
|
||||
Cache<JavaBinCodec.StringBytes, String> cache1 = new Cache<JavaBinCodec.StringBytes, String>() {
|
||||
private HashMap<JavaBinCodec.StringBytes, String> cache = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public String put(JavaBinCodec.StringBytes key, String val) {
|
||||
l.add(key);
|
||||
return cache.put(key, val);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(JavaBinCodec.StringBytes key) {
|
||||
return cache.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String remove(JavaBinCodec.StringBytes key) {
|
||||
return cache.remove(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
cache.clear();
|
||||
|
||||
}
|
||||
};
|
||||
JavaBinCodec.StringCache STRING_CACHE = new JavaBinCodec.StringCache(cache1);
|
||||
|
||||
// STRING_CACHE = new JavaBinCodec.StringCache(cache);
|
||||
byte[] bytes = new byte[0];
|
||||
JavaBinCodec.StringBytes stringBytes = new JavaBinCodec.StringBytes(null,0,0);
|
||||
|
||||
for(int i=0;i<10000;i++) {
|
||||
String s = String.valueOf(random().nextLong());
|
||||
int end = s.length();
|
||||
int maxSize = end * 4;
|
||||
if (bytes == null || bytes.length < maxSize) bytes = new byte[maxSize];
|
||||
int sz = ByteUtils.UTF16toUTF8(s, 0, end, bytes, 0);
|
||||
STRING_CACHE.get(stringBytes.reset(bytes, 0, sz));
|
||||
}
|
||||
printMem("after cache init");
|
||||
|
||||
long ms = System.currentTimeMillis();
|
||||
int ITERS = 1000000;
|
||||
int THREADS = 10;
|
||||
|
||||
runInThreads(THREADS, () -> {
|
||||
JavaBinCodec.StringBytes stringBytes1 = new JavaBinCodec.StringBytes(new byte[0], 0,0);
|
||||
for(int i=0;i< ITERS;i++){
|
||||
JavaBinCodec.StringBytes b = l.get(i % l.size());
|
||||
stringBytes1.reset(b.bytes,0,b.bytes.length);
|
||||
if(STRING_CACHE.get(stringBytes1) == null) throw new RuntimeException("error");
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
|
||||
|
||||
printMem("after cache test");
|
||||
System.out.println("time taken by LRUCACHE "+ (System.currentTimeMillis()-ms));
|
||||
ms = System.currentTimeMillis();
|
||||
|
||||
runInThreads(THREADS, ()-> {
|
||||
String a = null;
|
||||
CharArr arr = new CharArr();
|
||||
for (int i = 0; i < ITERS; i++) {
|
||||
JavaBinCodec.StringBytes sb = l.get(i % l.size());
|
||||
arr.reset();
|
||||
ByteUtils.UTF8toUTF16(sb.bytes, 0, sb.bytes.length, arr);
|
||||
a = arr.toString();
|
||||
}
|
||||
});
|
||||
|
||||
printMem("after new string test");
|
||||
System.out.println("time taken by string creation "+ (System.currentTimeMillis()-ms));
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
private void runInThreads(int count, Runnable runnable) throws InterruptedException {
|
||||
ArrayList<Thread> t =new ArrayList();
|
||||
for(int i=0;i<count;i++ ) t.add(new Thread(runnable));
|
||||
for (Thread thread : t) thread.start();
|
||||
for (Thread thread : t) thread.join();
|
||||
}
|
||||
|
||||
static void printMem(String head) {
|
||||
System.out.println("*************" + head + "***********");
|
||||
int mb = 1024*1024;
|
||||
//Getting the runtime reference from system
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
//Print used memory
|
||||
System.out.println("Used Memory:"
|
||||
+ (runtime.totalMemory() - runtime.freeMemory()) / mb);
|
||||
|
||||
//Print free memory
|
||||
System.out.println("Free Memory:"
|
||||
+ runtime.freeMemory() / mb);
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
TestJavaBinCodec test = new TestJavaBinCodec();
|
||||
test.genBinaryFiles();
|
||||
|
|
Loading…
Reference in New Issue