SOLR-8719 renamed classes to make their usage generic

This commit is contained in:
Noble Paul 2016-02-23 16:14:49 +05:30
parent cf96432630
commit 4d9d0c0011
3 changed files with 81 additions and 49 deletions

View File

@ -21,6 +21,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -46,13 +47,14 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.util.CryptoKeys; import org.apache.solr.util.CryptoKeys;
import org.apache.solr.util.SimplePostTool; import org.apache.solr.util.SimplePostTool;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* The purpose of this class is to store the Jars loaded in memory and to keep only one copy of the Jar in a single node. * The purpose of this class is to store the Jars loaded in memory and to keep only one copy of the Jar in a single node.
*/ */
public class JarRepository { public class BlobRepository {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final Random RANDOM; static final Random RANDOM;
@ -68,21 +70,44 @@ public class JarRepository {
} }
private final CoreContainer coreContainer; private final CoreContainer coreContainer;
private Map<String, JarContent> jars = new ConcurrentHashMap<>(); private Map<String, BlobContent> blobs = new ConcurrentHashMap<>();
public JarRepository(CoreContainer coreContainer) { public BlobRepository(CoreContainer coreContainer) {
this.coreContainer = coreContainer; this.coreContainer = coreContainer;
} }
public static ByteBuffer getFileContent(BlobContent blobContent, String entryName) throws IOException {
ByteArrayInputStream zipContents = new ByteArrayInputStream(blobContent.buffer.array(), blobContent.buffer.arrayOffset(), blobContent.buffer.limit());
ZipInputStream zis = new ZipInputStream(zipContents);
try {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
if (entryName == null || entryName.equals(entry.getName())) {
SimplePostTool.BAOS out = new SimplePostTool.BAOS();
byte[] buffer = new byte[2048];
int size;
while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
out.write(buffer, 0, size);
}
out.close();
return out.getByteBuffer();
}
}
} finally {
zis.closeEntry();
}
return null;
}
/** /**
* Returns the contents of a jar and increments a reference count. Please return the same object to decrease the refcount * Returns the contents of a jar and increments a reference count. Please return the same object to decrease the refcount
* *
* @param key it is a combination of blobname and version like blobName/version * @param key it is a combination of blobname and version like blobName/version
* @return The reference of a jar * @return The reference of a jar
*/ */
public JarContentRef getJarIncRef(String key) { public BlobContentRef getBlobIncRef(String key) {
JarContent jar = jars.get(key); BlobContent aBlob = blobs.get(key);
if (jar == null) { if (aBlob == null) {
if (this.coreContainer.isZooKeeperAware()) { if (this.coreContainer.isZooKeeperAware()) {
Replica replica = getSystemCollReplica(); Replica replica = getSystemCollReplica();
String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream"; String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream";
@ -106,7 +131,7 @@ public class JarRepository {
} finally { } finally {
httpGet.releaseConnection(); httpGet.releaseConnection();
} }
jars.put(key, jar = new JarContent(key, b)); blobs.put(key, aBlob = new BlobContent(key, b));
} else { } else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Jar loading is not supported in non-cloud mode"); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Jar loading is not supported in non-cloud mode");
// todo // todo
@ -114,9 +139,9 @@ public class JarRepository {
} }
JarContentRef ref = new JarContentRef(jar); BlobContentRef ref = new BlobContentRef(aBlob);
synchronized (jar.references) { synchronized (aBlob.references) {
jar.references.add(ref); aBlob.references.add(ref);
} }
return ref; return ref;
@ -157,52 +182,54 @@ public class JarRepository {
* *
* @param ref The reference that is already there. Doing multiple calls with same ref will not matter * @param ref The reference that is already there. Doing multiple calls with same ref will not matter
*/ */
public void decrementJarRefCount(JarContentRef ref) { public void decrementBlobRefCount(BlobContentRef ref) {
if (ref == null) return; if (ref == null) return;
synchronized (ref.jar.references) { synchronized (ref.blob.references) {
if (!ref.jar.references.remove(ref)) { if (!ref.blob.references.remove(ref)) {
log.error("Multiple releases for the same reference"); log.error("Multiple releases for the same reference");
} }
if (ref.jar.references.isEmpty()) { if (ref.blob.references.isEmpty()) {
jars.remove(ref.jar.key); blobs.remove(ref.blob.key);
} }
} }
} }
public static class JarContent { public static class BlobContent {
private final String key; private final String key;
private Map<String, Object> decodedObjects = null;
// TODO move this off-heap // TODO move this off-heap
private final ByteBuffer buffer; private final ByteBuffer buffer;
// ref counting mechanism // ref counting mechanism
private final Set<JarContentRef> references = new HashSet<>(); private final Set<BlobContentRef> references = new HashSet<>();
public JarContent(String key, ByteBuffer buffer) {
public BlobContent(String key, ByteBuffer buffer) {
this.key = key; this.key = key;
this.buffer = buffer; this.buffer = buffer;
} }
public ByteBuffer getFileContent(String entryName) throws IOException { /**
ByteArrayInputStream zipContents = new ByteArrayInputStream(buffer.array(), buffer.arrayOffset(), buffer.limit()); * This method decodes the byte[] to a custom Object
ZipInputStream zis = new ZipInputStream(zipContents); *
try { * @param key The key is used to store the decoded Object. it is possible to have multiple
ZipEntry entry; * decoders for the same blob (may be unusual).
while ((entry = zis.getNextEntry()) != null) { * @param decoder A decoder instance
if (entryName == null || entryName.equals(entry.getName())) { * @return the decoded Object . If it was already decoded, then return from the cache
SimplePostTool.BAOS out = new SimplePostTool.BAOS(); */
byte[] buffer = new byte[2048]; public <T> T decodeAndCache(String key, Decoder<T> decoder) {
int size; if (decodedObjects == null) {
while ((size = zis.read(buffer, 0, buffer.length)) != -1) { synchronized (this) {
out.write(buffer, 0, size); if (decodedObjects == null) decodedObjects = new ConcurrentHashMap<>();
}
out.close();
return out.getByteBuffer();
}
} }
} finally {
zis.closeEntry();
} }
return null;
Object t = decodedObjects.get(key);
if (t != null) return (T) t;
t = decoder.decode(new ByteBufferInputStream(buffer));
decodedObjects.put(key, t);
return (T) t;
} }
public String checkSignature(String base64Sig, CryptoKeys keys) { public String checkSignature(String base64Sig, CryptoKeys keys) {
@ -211,11 +238,16 @@ public class JarRepository {
} }
public static class JarContentRef { public interface Decoder<T> {
public final JarContent jar;
private JarContentRef(JarContent jar) { T decode(InputStream inputStream);
this.jar = jar; }
public static class BlobContentRef {
public final BlobContent blob;
private BlobContentRef(BlobContent blob) {
this.blob = blob;
} }
} }

View File

@ -134,7 +134,7 @@ public class CoreContainer {
private String hostName; private String hostName;
private final JarRepository jarRepository = new JarRepository(this); private final BlobRepository blobRepository = new BlobRepository(this);
private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null); private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
@ -1098,8 +1098,8 @@ public class CoreContainer {
return core; return core;
} }
public JarRepository getJarRepository(){ public BlobRepository getBlobRepository(){
return jarRepository; return blobRepository;
} }
/** /**

View File

@ -386,7 +386,7 @@ public class PluginBag<T> implements AutoCloseable {
*/ */
public static class RuntimeLib implements PluginInfoInitialized, AutoCloseable { public static class RuntimeLib implements PluginInfoInitialized, AutoCloseable {
private String name, version, sig; private String name, version, sig;
private JarRepository.JarContentRef jarContent; private BlobRepository.BlobContentRef jarContent;
private final CoreContainer coreContainer; private final CoreContainer coreContainer;
private boolean verified = false; private boolean verified = false;
@ -410,7 +410,7 @@ public class PluginBag<T> implements AutoCloseable {
if (jarContent != null) return; if (jarContent != null) return;
synchronized (this) { synchronized (this) {
if (jarContent != null) return; if (jarContent != null) return;
jarContent = coreContainer.getJarRepository().getJarIncRef(name + "/" + version); jarContent = coreContainer.getBlobRepository().getBlobIncRef(name + "/" + version);
} }
} }
@ -430,13 +430,13 @@ public class PluginBag<T> implements AutoCloseable {
public ByteBuffer getFileContent(String entryName) throws IOException { public ByteBuffer getFileContent(String entryName) throws IOException {
if (jarContent == null) if (jarContent == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "jar not available: " + name + "/" + version); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "jar not available: " + name + "/" + version);
return jarContent.jar.getFileContent(entryName); return BlobRepository.getFileContent(jarContent.blob, entryName);
} }
@Override @Override
public void close() throws Exception { public void close() throws Exception {
if (jarContent != null) coreContainer.getJarRepository().decrementJarRefCount(jarContent); if (jarContent != null) coreContainer.getBlobRepository().decrementBlobRefCount(jarContent);
} }
public static List<RuntimeLib> getLibObjects(SolrCore core, List<PluginInfo> libs) { public static List<RuntimeLib> getLibObjects(SolrCore core, List<PluginInfo> libs) {
@ -472,7 +472,7 @@ public class PluginBag<T> implements AutoCloseable {
} }
try { try {
String matchedKey = jarContent.jar.checkSignature(sig, new CryptoKeys(keys)); String matchedKey = jarContent.blob.checkSignature(sig, new CryptoKeys(keys));
if (matchedKey == null) if (matchedKey == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No key matched signature for jar : " + name + " version: " + version); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No key matched signature for jar : " + name + " version: " + version);
log.info("Jar {} signed with {} successfully verified", name, matchedKey); log.info("Jar {} signed with {} successfully verified", name, matchedKey);