mirror of https://github.com/apache/lucene.git
SOLR-8349: Allow sharing of large in memory data structures across cores
This commit is contained in:
parent
4751b83c97
commit
9a1880aee8
|
@ -96,6 +96,9 @@ New Features
|
|||
* SOLR-8962: Add sort Streaming Expression. The expression takes a single input stream and a
|
||||
comparator and outputs tuples in stable order of the comparator. (Dennis Gove)
|
||||
|
||||
* SOLR-8349: Allow sharing of large in memory data structures across cores (Gus Heck, noble)
|
||||
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -31,9 +31,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
|
@ -46,18 +46,21 @@ import org.apache.solr.common.cloud.Replica;
|
|||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.util.CryptoKeys;
|
||||
import org.apache.solr.util.SimplePostTool;
|
||||
import org.apache.zookeeper.server.ByteBufferInputStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.SolrException.ErrorCode.SERVICE_UNAVAILABLE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||
|
||||
/**
|
||||
* The purpose of this class is to store the Jars loaded in memory and to keep only one copy of the Jar in a single node.
|
||||
*/
|
||||
public class BlobRepository {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
static final Random RANDOM;
|
||||
static final Pattern BLOB_KEY_PATTERN_CHECKER = Pattern.compile(".*/\\d+");
|
||||
|
||||
static {
|
||||
// We try to make things reproducible in the context of our tests by initializing the random instance
|
||||
|
@ -71,81 +74,113 @@ public class BlobRepository {
|
|||
}
|
||||
|
||||
private final CoreContainer coreContainer;
|
||||
private Map<String, BlobContent> blobs = new ConcurrentHashMap<>();
|
||||
private Map<String, BlobContent> blobs = createMap();
|
||||
|
||||
// for unit tests to override
|
||||
ConcurrentHashMap<String, BlobContent> createMap() {
|
||||
return new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
public BlobRepository(CoreContainer coreContainer) {
|
||||
this.coreContainer = coreContainer;
|
||||
}
|
||||
|
||||
public static ByteBuffer getFileContent(BlobContent blobContent, String entryName) throws IOException {
|
||||
ByteArrayInputStream zipContents = new ByteArrayInputStream(blobContent.buffer.array(), blobContent.buffer.arrayOffset(), blobContent.buffer.limit());
|
||||
ZipInputStream zis = new ZipInputStream(zipContents);
|
||||
try {
|
||||
ZipEntry entry;
|
||||
while ((entry = zis.getNextEntry()) != null) {
|
||||
if (entryName == null || entryName.equals(entry.getName())) {
|
||||
SimplePostTool.BAOS out = new SimplePostTool.BAOS();
|
||||
byte[] buffer = new byte[2048];
|
||||
int size;
|
||||
while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
|
||||
out.write(buffer, 0, size);
|
||||
}
|
||||
out.close();
|
||||
return out.getByteBuffer();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
zis.closeEntry();
|
||||
}
|
||||
return null;
|
||||
// I wanted to {@link SolrCore#loadDecodeAndCacheBlob(String, Decoder)} below but precommit complains
|
||||
/**
|
||||
* Returns the contents of a blob containing a ByteBuffer and increments a reference count. Please return the
|
||||
* same object to decrease the refcount. This is normally used for storing jar files, and binary raw data.
|
||||
* If you are caching Java Objects you want to use {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)}
|
||||
*
|
||||
* @param key it is a combination of blobname and version like blobName/version
|
||||
* @return The reference of a blob
|
||||
*/
|
||||
public BlobContentRef<ByteBuffer> getBlobIncRef(String key) {
|
||||
return getBlobIncRef(key, () -> addBlob(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the contents of a jar and increments a reference count. Please return the same object to decrease the refcount
|
||||
* Internal method that returns the contents of a blob and increments a reference count. Please return the same
|
||||
* object to decrease the refcount. Only the decoded content will be cached when this method is used. Component
|
||||
* authors attempting to share objects across cores should use
|
||||
* {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)} which ensures that a proper close hook is also created.
|
||||
*
|
||||
* @param key it is a combination of blobname and version like blobName/version
|
||||
* @return The reference of a jar
|
||||
* @param key it is a combination of blob name and version like blobName/version
|
||||
* @param decoder a decoder that knows how to interpret the bytes from the blob
|
||||
* @return The reference of a blob
|
||||
*/
|
||||
public BlobContentRef getBlobIncRef(String key) {
|
||||
BlobContent aBlob = blobs.get(key);
|
||||
if (aBlob == null) {
|
||||
if (this.coreContainer.isZooKeeperAware()) {
|
||||
Replica replica = getSystemCollReplica();
|
||||
String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream";
|
||||
BlobContentRef<Object> getBlobIncRef(String key, Decoder<Object> decoder) {
|
||||
return getBlobIncRef(key.concat(decoder.getName()), () -> addBlob(key,decoder));
|
||||
}
|
||||
|
||||
HttpClient httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
|
||||
HttpGet httpGet = new HttpGet(url);
|
||||
ByteBuffer b;
|
||||
try {
|
||||
HttpResponse entity = httpClient.execute(httpGet, HttpClientUtil.createNewHttpClientRequestContext());
|
||||
int statusCode = entity.getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version available: " + key);
|
||||
// do the actual work returning the appropriate type...
|
||||
private <T> BlobContentRef<T> getBlobIncRef(String key, Callable<BlobContent<T>> blobCreator) {
|
||||
BlobContent<T> aBlob;
|
||||
if (this.coreContainer.isZooKeeperAware()) {
|
||||
synchronized (blobs) {
|
||||
aBlob = blobs.get(key);
|
||||
if (aBlob == null) {
|
||||
try {
|
||||
aBlob = blobCreator.call();
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading failed: "+e.getMessage(), e);
|
||||
}
|
||||
b = SimplePostTool.inputStreamToByteArray(entity.getEntity().getContent());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof SolrException) {
|
||||
throw (SolrException) e;
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load : " + key, e);
|
||||
}
|
||||
} finally {
|
||||
httpGet.releaseConnection();
|
||||
}
|
||||
blobs.put(key, aBlob = new BlobContent(key, b));
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Jar loading is not supported in non-cloud mode");
|
||||
// todo
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading is not supported in non-cloud mode");
|
||||
// todo
|
||||
}
|
||||
|
||||
BlobContentRef ref = new BlobContentRef(aBlob);
|
||||
BlobContentRef<T> ref = new BlobContentRef<>(aBlob);
|
||||
synchronized (aBlob.references) {
|
||||
aBlob.references.add(ref);
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
// For use cases sharing raw bytes
|
||||
private BlobContent<ByteBuffer> addBlob(String key) {
|
||||
ByteBuffer b = fetchBlob(key);
|
||||
BlobContent<ByteBuffer> aBlob = new BlobContent<>(key, b);
|
||||
blobs.put(key, aBlob);
|
||||
return aBlob;
|
||||
}
|
||||
|
||||
// for use cases sharing java objects
|
||||
private BlobContent<Object> addBlob(String key, Decoder<Object> decoder) {
|
||||
ByteBuffer b = fetchBlob(key);
|
||||
String keyPlusName = key + decoder.getName();
|
||||
BlobContent<Object> aBlob = new BlobContent<>(keyPlusName, b, decoder);
|
||||
blobs.put(keyPlusName, aBlob);
|
||||
return aBlob;
|
||||
}
|
||||
|
||||
/**
|
||||
* Package local for unit tests only please do not use elsewhere
|
||||
*/
|
||||
ByteBuffer fetchBlob(String key) {
|
||||
Replica replica = getSystemCollReplica();
|
||||
String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream";
|
||||
|
||||
HttpClient httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
|
||||
HttpGet httpGet = new HttpGet(url);
|
||||
ByteBuffer b;
|
||||
try {
|
||||
HttpResponse entity = httpClient.execute(httpGet);
|
||||
int statusCode = entity.getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version available: " + key);
|
||||
}
|
||||
b = SimplePostTool.inputStreamToByteArray(entity.getEntity().getContent());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof SolrException) {
|
||||
throw (SolrException) e;
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load : " + key, e);
|
||||
}
|
||||
} finally {
|
||||
httpGet.releaseConnection();
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
private Replica getSystemCollReplica() {
|
||||
|
@ -193,61 +228,60 @@ public class BlobRepository {
|
|||
blobs.remove(ref.blob.key);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class BlobContent {
|
||||
private final String key;
|
||||
private Map<String, Object> decodedObjects = null;
|
||||
// TODO move this off-heap
|
||||
private final ByteBuffer buffer;
|
||||
public static class BlobContent<T> {
|
||||
public final String key;
|
||||
private final T content; // holds byte buffer or cached object, holding both is a waste of memory
|
||||
// ref counting mechanism
|
||||
private final Set<BlobContentRef> references = new HashSet<>();
|
||||
|
||||
public BlobContent(String key, ByteBuffer buffer, Decoder<T> decoder) {
|
||||
this.key = key;
|
||||
this.content = decoder.decode(new ByteBufferInputStream(buffer));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public BlobContent(String key, ByteBuffer buffer) {
|
||||
this.key = key;
|
||||
this.buffer = buffer;
|
||||
this.content = (T) buffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method decodes the byte[] to a custom Object
|
||||
* Get the cached object.
|
||||
*
|
||||
* @param key The key is used to store the decoded Object. it is possible to have multiple
|
||||
* decoders for the same blob (may be unusual).
|
||||
* @param decoder A decoder instance
|
||||
* @return the decoded Object . If it was already decoded, then return from the cache
|
||||
* @return the object representing the content that is cached.
|
||||
*/
|
||||
public <T> T decodeAndCache(String key, Decoder<T> decoder) {
|
||||
if (decodedObjects == null) {
|
||||
synchronized (this) {
|
||||
if (decodedObjects == null) decodedObjects = new ConcurrentHashMap<>();
|
||||
}
|
||||
}
|
||||
|
||||
Object t = decodedObjects.get(key);
|
||||
if (t != null) return (T) t;
|
||||
t = decoder.decode(new ByteBufferInputStream(buffer));
|
||||
decodedObjects.put(key, t);
|
||||
return (T) t;
|
||||
|
||||
}
|
||||
|
||||
public String checkSignature(String base64Sig, CryptoKeys keys) {
|
||||
return keys.verify(base64Sig, buffer);
|
||||
public T get() {
|
||||
return this.content;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public interface Decoder<T> {
|
||||
|
||||
/**
|
||||
* A name by which to distinguish this decoding. This only needs to be implemented if you want to support
|
||||
* decoding the same blob content with more than one decoder.
|
||||
*
|
||||
* @return The name of the decoding, defaults to empty string.
|
||||
*/
|
||||
default String getName() { return ""; }
|
||||
|
||||
/**
|
||||
* A routine that knows how to convert the stream of bytes from the blob into a Java object.
|
||||
*
|
||||
* @param inputStream the bytes from a blob
|
||||
* @return A Java object of the specified type.
|
||||
*/
|
||||
T decode(InputStream inputStream);
|
||||
}
|
||||
|
||||
public static class BlobContentRef {
|
||||
public final BlobContent blob;
|
||||
|
||||
private BlobContentRef(BlobContent blob) {
|
||||
public static class BlobContentRef<T> {
|
||||
public final BlobContent<T> blob;
|
||||
|
||||
private BlobContentRef(BlobContent<T> blob) {
|
||||
this.blob = blob;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.solr.core;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -28,6 +29,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
|
||||
import org.apache.lucene.analysis.util.ResourceLoader;
|
||||
import org.apache.lucene.analysis.util.ResourceLoaderAware;
|
||||
|
@ -38,6 +41,7 @@ import org.apache.solr.handler.RequestHandlerBase;
|
|||
import org.apache.solr.handler.component.SearchComponent;
|
||||
import org.apache.solr.request.SolrRequestHandler;
|
||||
import org.apache.solr.util.CryptoKeys;
|
||||
import org.apache.solr.util.SimplePostTool;
|
||||
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
|
||||
import org.apache.solr.util.plugin.PluginInfoInitialized;
|
||||
import org.apache.solr.util.plugin.SolrCoreAware;
|
||||
|
@ -386,7 +390,7 @@ public class PluginBag<T> implements AutoCloseable {
|
|||
*/
|
||||
public static class RuntimeLib implements PluginInfoInitialized, AutoCloseable {
|
||||
private String name, version, sig;
|
||||
private BlobRepository.BlobContentRef jarContent;
|
||||
private BlobRepository.BlobContentRef<ByteBuffer> jarContent;
|
||||
private final CoreContainer coreContainer;
|
||||
private boolean verified = false;
|
||||
|
||||
|
@ -430,10 +434,35 @@ public class PluginBag<T> implements AutoCloseable {
|
|||
public ByteBuffer getFileContent(String entryName) throws IOException {
|
||||
if (jarContent == null)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "jar not available: " + name + "/" + version);
|
||||
return BlobRepository.getFileContent(jarContent.blob, entryName);
|
||||
return getFileContent(jarContent.blob, entryName);
|
||||
|
||||
}
|
||||
|
||||
public ByteBuffer getFileContent(BlobRepository.BlobContent<ByteBuffer> blobContent, String entryName) throws IOException {
|
||||
ByteBuffer buff = blobContent.get();
|
||||
ByteArrayInputStream zipContents = new ByteArrayInputStream(buff.array(), buff.arrayOffset(), buff.limit());
|
||||
ZipInputStream zis = new ZipInputStream(zipContents);
|
||||
try {
|
||||
ZipEntry entry;
|
||||
while ((entry = zis.getNextEntry()) != null) {
|
||||
if (entryName == null || entryName.equals(entry.getName())) {
|
||||
SimplePostTool.BAOS out = new SimplePostTool.BAOS();
|
||||
byte[] buffer = new byte[2048];
|
||||
int size;
|
||||
while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
|
||||
out.write(buffer, 0, size);
|
||||
}
|
||||
out.close();
|
||||
return out.getByteBuffer();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
zis.closeEntry();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (jarContent != null) coreContainer.getBlobRepository().decrementBlobRefCount(jarContent);
|
||||
|
@ -472,7 +501,7 @@ public class PluginBag<T> implements AutoCloseable {
|
|||
}
|
||||
|
||||
try {
|
||||
String matchedKey = jarContent.blob.checkSignature(sig, new CryptoKeys(keys));
|
||||
String matchedKey = new CryptoKeys(keys).verify(sig, jarContent.blob.get());
|
||||
if (matchedKey == null)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No key matched signature for jar : " + name + " version: " + version);
|
||||
log.info("Jar {} signed with {} successfully verified", name, matchedKey);
|
||||
|
|
|
@ -2569,6 +2569,38 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
}
|
||||
return implicits;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to load a blob. This method minimizes the degree to which component and other code needs
|
||||
* to depend on the structure of solr's object graph and ensures that a proper close hook is registered. This method
|
||||
* should normally be called in {@link SolrCoreAware#inform(SolrCore)}, and should never be called during request
|
||||
* processing. The Decoder will only run on the first invocations, subsequent invocations will return the
|
||||
* cached object.
|
||||
*
|
||||
* @param key A key in the format of name/version for a blob stored in the .system blob store via the Blob Store API
|
||||
* @param decoder a decoder with which to convert the blob into a Java Object representation (first time only)
|
||||
* @return a reference to the blob that has already cached the decoded version.
|
||||
*/
|
||||
public BlobRepository.BlobContentRef loadDecodeAndCacheBlob(String key, BlobRepository.Decoder<Object> decoder) {
|
||||
// make sure component authors don't give us oddball keys with no version...
|
||||
if (!BlobRepository.BLOB_KEY_PATTERN_CHECKER.matcher(key).matches()) {
|
||||
throw new IllegalArgumentException("invalid key format, must end in /N where N is the version number");
|
||||
}
|
||||
CoreContainer coreContainer = getCoreDescriptor().getCoreContainer();
|
||||
// define the blob
|
||||
BlobRepository.BlobContentRef blobRef = coreContainer.getBlobRepository().getBlobIncRef(key, decoder);
|
||||
addCloseHook(new CloseHook() {
|
||||
@Override
|
||||
public void preClose(SolrCore core) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postClose(SolrCore core) {
|
||||
core.getCoreDescriptor().getCoreContainer().getBlobRepository().decrementBlobRefCount(blobRef);
|
||||
}
|
||||
});
|
||||
return blobRef;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<types>
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
</types>
|
||||
<fields>
|
||||
<dynamicField name="*" type="string" indexed="true" stored="true" />
|
||||
</fields>
|
||||
</schema>
|
|
@ -0,0 +1,51 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
|
||||
|
||||
<config>
|
||||
|
||||
<dataDir>${solr.data.dir:}</dataDir>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
|
||||
<schemaFactory class="ClassicIndexSchemaFactory"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<commitWithin>
|
||||
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
|
||||
</commitWithin>
|
||||
|
||||
</updateHandler>
|
||||
<searchComponent name="testComponent" class="org.apache.solr.handler.component.ResourceSharingTestComponent" />
|
||||
|
||||
<requestHandler name="/select" class="solr.SearchHandler">
|
||||
<lst name="defaults">
|
||||
<str name="echoParams">explicit</str>
|
||||
<str name="indent">true</str>
|
||||
<str name="df">text</str>
|
||||
</lst>
|
||||
<arr name="first-components">
|
||||
<str>testComponent</str>
|
||||
</arr>
|
||||
</requestHandler>
|
||||
</config>
|
||||
|
|
@ -0,0 +1,138 @@
|
|||
package org.apache.solr.core;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.cloud.ZkTestServer;
|
||||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.zookeeper.server.DataNode;
|
||||
import org.apache.zookeeper.server.DataTree;
|
||||
import org.apache.zookeeper.server.ZKDatabase;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public class BlobRepositoryCloudTest extends SolrCloudTestCase {
|
||||
|
||||
public static final Path TEST_PATH = getFile("solr/configsets").toPath();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(1) // only sharing *within* a node
|
||||
.addConfig("configname", TEST_PATH.resolve("resource-sharing"))
|
||||
.configure();
|
||||
// Thread.sleep(2000);
|
||||
HashMap<String, String> params = new HashMap<>();
|
||||
cluster.createCollection(".system", 1, 1, null, params);
|
||||
// Thread.sleep(2000);
|
||||
// test component will fail if it cant' find a blob with this data by this name
|
||||
postBlob("testResource", "foo,bar\nbaz,bam");
|
||||
// Thread.sleep(2000);
|
||||
// if these don't load we probably failed to post the blob above
|
||||
cluster.createCollection("col1", 1, 1, "configname", params);
|
||||
cluster.createCollection("col2", 1, 1, "configname", params);
|
||||
// Thread.sleep(2000);
|
||||
SolrInputDocument document = new SolrInputDocument();
|
||||
document.addField("id", "1");
|
||||
document.addField("text", "col1");
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
solrClient.add("col1", document);
|
||||
solrClient.commit("col1");
|
||||
document = new SolrInputDocument();
|
||||
document.addField("id", "1");
|
||||
document.addField("text", "col2");
|
||||
solrClient.add("col2", document);
|
||||
solrClient.commit("col2");
|
||||
Thread.sleep(2000);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
// This test relies on the installation of ResourceSharingTestComponent which has 2 useful properties:
|
||||
// 1. it will fail to initialize if it doesn't find a 2 line CSV like foo,bar\nbaz,bam thus validating
|
||||
// that we are properly pulling data from the blob store
|
||||
// 2. It replaces any q for a query request to /select with "text:<name>" where <name> is the name
|
||||
// of the last collection to run a query. It does this by caching a shared resource of type
|
||||
// ResourceSharingTestComponent.TestObject, and the following sequence is proof that either
|
||||
// collection can tell if it was (or was not) the last collection to issue a query by
|
||||
// consulting the shared object
|
||||
assertLastQueryNotToCollection("col1");
|
||||
assertLastQueryNotToCollection("col2");
|
||||
assertLastQueryNotToCollection("col1");
|
||||
assertLastQueryToCollection("col1");
|
||||
assertLastQueryNotToCollection("col2");
|
||||
assertLastQueryToCollection("col2");
|
||||
}
|
||||
|
||||
// TODO: move this up to parent class? Probably accepting entity, or with alternative signatures
|
||||
private static void postBlob(String name, String string) throws IOException {
|
||||
HttpPost post = new HttpPost(findLiveNodeURI() + "/.system/blob/" + name);
|
||||
StringEntity csv = new StringEntity(string, ContentType.create("application/octet-stream"));
|
||||
post.setEntity(csv);
|
||||
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
|
||||
httpclient.execute(post);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: move this up to parent class?
|
||||
private static String findLiveNodeURI() {
|
||||
ZkTestServer zkServer = cluster.getZkServer();
|
||||
ZKDatabase zkDatabase = zkServer.getZKDatabase();
|
||||
DataTree dataTree = zkDatabase.getDataTree();
|
||||
DataNode node = dataTree.getNode("/solr/live_nodes");
|
||||
Set<String> children = node.getChildren();
|
||||
String liveNode = children.iterator().next();
|
||||
String[] split = liveNode.split("_");
|
||||
String host = split[0];
|
||||
String name = split[1];
|
||||
return "http://" + host + "/" + name;
|
||||
}
|
||||
|
||||
private void assertLastQueryToCollection(String collection) throws SolrServerException, IOException {
|
||||
assertEquals(1, getSolrDocuments(collection).size());
|
||||
}
|
||||
|
||||
private void assertLastQueryNotToCollection(String collection) throws SolrServerException, IOException {
|
||||
assertEquals(0, getSolrDocuments(collection).size());
|
||||
}
|
||||
|
||||
private SolrDocumentList getSolrDocuments(String collection) throws SolrServerException, IOException {
|
||||
SolrQuery query = new SolrQuery("*:*");
|
||||
CloudSolrClient client = cluster.getSolrClient();
|
||||
QueryResponse resp1 = client.query(collection, query);
|
||||
return resp1.getResults();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
package org.apache.solr.core;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.easymock.EasyMock.anyObject;
|
||||
import static org.easymock.EasyMock.eq;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public class BlobRepositoryMockingTest {
|
||||
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
private static final String[][] PARSED = new String[][]{{"foo", "bar", "baz"}, {"bang", "boom", "bash"}};
|
||||
private static final String BLOBSTR = "foo,bar,baz\nbang,boom,bash";
|
||||
private CoreContainer mockContainer = EasyMock.createMock(CoreContainer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
private ConcurrentHashMap<String, BlobRepository.BlobContent> mapMock = EasyMock.createMock(ConcurrentHashMap.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
private BlobRepository.Decoder<Object> decoderMock = EasyMock.createMock(BlobRepository.Decoder.class);;
|
||||
@SuppressWarnings("unchecked")
|
||||
private BlobRepository.BlobContent<Object> blobContentMock = EasyMock.createMock(BlobRepository.BlobContent.class);
|
||||
|
||||
private Object[] mocks = new Object[] {
|
||||
mockContainer,
|
||||
decoderMock,
|
||||
blobContentMock,
|
||||
mapMock
|
||||
};
|
||||
|
||||
BlobRepository repository;
|
||||
ByteBuffer blobData = ByteBuffer.wrap(BLOBSTR.getBytes(UTF8));
|
||||
boolean blobFetched = false;
|
||||
String blobKey = "";
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws IllegalAccessException, NoSuchFieldException {
|
||||
blobFetched = false;
|
||||
blobKey = "";
|
||||
EasyMock.reset(mocks);
|
||||
repository = new BlobRepository(mockContainer) {
|
||||
@Override
|
||||
ByteBuffer fetchBlob(String key) {
|
||||
blobKey = key;
|
||||
blobFetched = true;
|
||||
return blobData;
|
||||
}
|
||||
|
||||
@Override
|
||||
ConcurrentHashMap<String, BlobContent> createMap() {
|
||||
return mapMock;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
EasyMock.verify(mocks);
|
||||
}
|
||||
|
||||
@Test (expected = SolrException.class)
|
||||
public void testCloudOnly() {
|
||||
expect(mockContainer.isZooKeeperAware()).andReturn(false);
|
||||
EasyMock.replay(mocks);
|
||||
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testGetBlobIncrRefString() {
|
||||
expect(mockContainer.isZooKeeperAware()).andReturn(true);
|
||||
expect(mapMock.get("foo!")).andReturn(null);
|
||||
expect(mapMock.put(eq("foo!"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
|
||||
EasyMock.replay(mocks);
|
||||
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
|
||||
assertTrue("foo!".equals(blobKey));
|
||||
assertTrue(blobFetched);
|
||||
assertNotNull(ref.blob);
|
||||
assertEquals(blobData, ref.blob.get());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testCachedAlready() {
|
||||
expect(mockContainer.isZooKeeperAware()).andReturn(true);
|
||||
expect(mapMock.get("foo!")).andReturn(new BlobRepository.BlobContent<BlobRepository>("foo!", blobData));
|
||||
EasyMock.replay(mocks);
|
||||
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
|
||||
assertEquals("",blobKey);
|
||||
assertFalse(blobFetched);
|
||||
assertNotNull(ref.blob);
|
||||
assertEquals(blobData, ref.blob.get());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testGetBlobIncrRefStringDecoder() {
|
||||
expect(mockContainer.isZooKeeperAware()).andReturn(true);
|
||||
expect(mapMock.get("foo!mocked")).andReturn(null);
|
||||
expect(mapMock.put(eq("foo!mocked"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
|
||||
|
||||
EasyMock.replay(mocks);
|
||||
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!", new BlobRepository.Decoder<Object>() {
|
||||
@Override
|
||||
public Object decode(InputStream inputStream) {
|
||||
StringWriter writer = new StringWriter();
|
||||
try {
|
||||
IOUtils.copy(inputStream, writer, UTF8);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
assertEquals(BLOBSTR, writer.toString());
|
||||
return PARSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "mocked";
|
||||
}
|
||||
});
|
||||
assertEquals("foo!",blobKey);
|
||||
assertTrue(blobFetched);
|
||||
assertNotNull(ref.blob);
|
||||
assertEquals(PARSED, ref.blob.get());
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -104,7 +104,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase {
|
|||
Map map = TestSolrConfigHandler.getRespMap("/test1?wt=json", client);
|
||||
|
||||
assertNotNull(TestBlobHandler.getAsString(map), map = (Map) map.get("error"));
|
||||
assertEquals(TestBlobHandler.getAsString(map), ".system collection not available", map.get("msg"));
|
||||
assertTrue(TestBlobHandler.getAsString(map), map.get("msg").toString().contains(".system collection not available"));
|
||||
|
||||
|
||||
TestBlobHandler.createSystemCollection(getHttpSolrClient(baseURL, randomClient.getHttpClient()));
|
||||
|
@ -114,7 +114,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase {
|
|||
|
||||
|
||||
assertNotNull(map = (Map) map.get("error"));
|
||||
assertEquals("full output " + TestBlobHandler.getAsString(map), "no such blob or version available: colltest/1" , map.get("msg"));
|
||||
assertTrue("full output " + TestBlobHandler.getAsString(map), map.get("msg").toString().contains("no such blob or version available: colltest/1" ));
|
||||
payload = " {\n" +
|
||||
" 'set' : {'watched': {" +
|
||||
" 'x':'X val',\n" +
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
package org.apache.solr.handler.component;
|
||||
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.BlobRepository;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.util.plugin.SolrCoreAware;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public class ResourceSharingTestComponent extends SearchComponent implements SolrCoreAware {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private SolrCore core;
|
||||
private volatile BlobRepository.BlobContent<TestObject> blob;
|
||||
|
||||
@SuppressWarnings("SynchronizeOnNonFinalField")
|
||||
@Override
|
||||
public void prepare(ResponseBuilder rb) throws IOException {
|
||||
SolrParams params = rb.req.getParams();
|
||||
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
|
||||
String q = "text:" + getTestObj().getLastCollection();
|
||||
mParams.set("q", q); // search for the last collection name.
|
||||
// This should cause the param to show up in the response...
|
||||
rb.req.setParams(mParams);
|
||||
getTestObj().setLastCollection(core.getCoreDescriptor().getCollectionName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(ResponseBuilder rb) throws IOException {}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "ResourceSharingTestComponent";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
TestObject getTestObj() {
|
||||
return this.blob.get();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void inform(SolrCore core) {
|
||||
log.info("Informing test component...");
|
||||
this.core = core;
|
||||
this.blob = core.loadDecodeAndCacheBlob(getKey(), new DumbCsvDecoder()).blob;
|
||||
log.info("Test component informed!");
|
||||
}
|
||||
|
||||
private String getKey() {
|
||||
return getResourceName() + "/" + getResourceVersion();
|
||||
}
|
||||
|
||||
public String getResourceName() {
|
||||
return "testResource";
|
||||
}
|
||||
|
||||
public String getResourceVersion() {
|
||||
return "1";
|
||||
}
|
||||
|
||||
class DumbCsvDecoder implements BlobRepository.Decoder<Object> {
|
||||
private final Map<String, String> dict = new HashMap<>();
|
||||
|
||||
public DumbCsvDecoder() {}
|
||||
|
||||
void processSimpleCsvRow(String string) {
|
||||
String[] row = string.split(","); // dumbest csv parser ever... :)
|
||||
getDict().put(row[0], row[1]);
|
||||
}
|
||||
|
||||
public Map<String, String> getDict() {
|
||||
return dict;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestObject decode(InputStream inputStream) {
|
||||
// loading a tiny csv like:
|
||||
//
|
||||
// foo,bar
|
||||
// baz,bam
|
||||
|
||||
try (Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream, Charset.forName("UTF-8"))).lines()) {
|
||||
lines.forEach(this::processSimpleCsvRow);
|
||||
} catch (Exception e) {
|
||||
log.error("failed to read dictionary {}", getResourceName() );
|
||||
throw new RuntimeException("Cannot load dictionary " , e);
|
||||
}
|
||||
|
||||
assertEquals("bar", dict.get("foo"));
|
||||
assertEquals("bam", dict.get("baz"));
|
||||
log.info("Loaded {} using {}", getDict().size(), this.getClass().getClassLoader());
|
||||
|
||||
// if we get here we have seen the data from the blob and all we need is to test that two collections
|
||||
// are able to see the same object..
|
||||
return new TestObject();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class TestObject {
|
||||
public static final String NEVER_UPDATED = "never updated";
|
||||
private volatile String lastCollection = NEVER_UPDATED;
|
||||
|
||||
public String getLastCollection() {
|
||||
return this.lastCollection;
|
||||
}
|
||||
|
||||
public void setLastCollection(String lastCollection) {
|
||||
this.lastCollection = lastCollection;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue