SOLR-8349: Allow sharing of large in memory data structures across cores

This commit is contained in:
Noble Paul 2016-04-18 15:51:19 +05:30
parent a62752699a
commit 489acdb509
9 changed files with 597 additions and 5 deletions

View File

@ -67,6 +67,9 @@ New Features
* SOLR-8962: Add sort Streaming Expression. The expression takes a single input stream and a
comparator and outputs tuples in stable order of the comparator. (Dennis Gove)
* SOLR-8349: Allow sharing of large in memory data structures across cores (Gus Heck, noble)
Bug Fixes
----------------------

View File

@ -16,6 +16,7 @@
*/
package org.apache.solr.core;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
@ -28,6 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.lucene.analysis.util.ResourceLoader;
import org.apache.lucene.analysis.util.ResourceLoaderAware;
@ -38,6 +41,7 @@ import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.component.SearchComponent;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.util.CryptoKeys;
import org.apache.solr.util.SimplePostTool;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.apache.solr.util.plugin.SolrCoreAware;
@ -386,7 +390,7 @@ public class PluginBag<T> implements AutoCloseable {
*/
public static class RuntimeLib implements PluginInfoInitialized, AutoCloseable {
private String name, version, sig;
private BlobRepository.BlobContentRef jarContent;
private BlobRepository.BlobContentRef<ByteBuffer> jarContent;
private final CoreContainer coreContainer;
private boolean verified = false;
@ -430,10 +434,35 @@ public class PluginBag<T> implements AutoCloseable {
public ByteBuffer getFileContent(String entryName) throws IOException {
if (jarContent == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "jar not available: " + name + "/" + version);
return BlobRepository.getFileContent(jarContent.blob, entryName);
return getFileContent(jarContent.blob, entryName);
}
public ByteBuffer getFileContent(BlobRepository.BlobContent<ByteBuffer> blobContent, String entryName) throws IOException {
ByteBuffer buff = blobContent.get();
ByteArrayInputStream zipContents = new ByteArrayInputStream(buff.array(), buff.arrayOffset(), buff.limit());
ZipInputStream zis = new ZipInputStream(zipContents);
try {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
if (entryName == null || entryName.equals(entry.getName())) {
SimplePostTool.BAOS out = new SimplePostTool.BAOS();
byte[] buffer = new byte[2048];
int size;
while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
out.write(buffer, 0, size);
}
out.close();
return out.getByteBuffer();
}
}
} finally {
zis.closeEntry();
}
return null;
}
@Override
public void close() throws Exception {
if (jarContent != null) coreContainer.getBlobRepository().decrementBlobRefCount(jarContent);
@ -472,7 +501,7 @@ public class PluginBag<T> implements AutoCloseable {
}
try {
String matchedKey = jarContent.blob.checkSignature(sig, new CryptoKeys(keys));
String matchedKey = new CryptoKeys(keys).verify(sig, jarContent.blob.get());
if (matchedKey == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No key matched signature for jar : " + name + " version: " + version);
log.info("Jar {} signed with {} successfully verified", name, matchedKey);

View File

@ -2569,6 +2569,38 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
}
return implicits;
}
/**
* Convenience method to load a blob. This method minimizes the degree to which component and other code needs
* to depend on the structure of solr's object graph and ensures that a proper close hook is registered. This method
* should normally be called in {@link SolrCoreAware#inform(SolrCore)}, and should never be called during request
* processing. The Decoder will only run on the first invocations, subsequent invocations will return the
* cached object.
*
* @param key A key in the format of name/version for a blob stored in the .system blob store via the Blob Store API
* @param decoder a decoder with which to convert the blob into a Java Object representation (first time only)
* @return a reference to the blob that has already cached the decoded version.
*/
public BlobRepository.BlobContentRef loadDecodeAndCacheBlob(String key, BlobRepository.Decoder<Object> decoder) {
// make sure component authors don't give us oddball keys with no version...
if (!BlobRepository.BLOB_KEY_PATTERN_CHECKER.matcher(key).matches()) {
throw new IllegalArgumentException("invalid key format, must end in /N where N is the version number");
}
CoreContainer coreContainer = getCoreDescriptor().getCoreContainer();
// define the blob
BlobRepository.BlobContentRef blobRef = coreContainer.getBlobRepository().getBlobIncRef(key, decoder);
addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
}
@Override
public void postClose(SolrCore core) {
core.getCoreDescriptor().getCoreContainer().getBlobRepository().decrementBlobRefCount(blobRef);
}
});
return blobRef;
}
}

View File

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<types>
<fieldType name="string" class="solr.StrField"/>
</types>
<fields>
<dynamicField name="*" type="string" indexed="true" stored="true" />
</fields>
</schema>

View File

@ -0,0 +1,51 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<schemaFactory class="ClassicIndexSchemaFactory"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateHandler class="solr.DirectUpdateHandler2">
<commitWithin>
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
</commitWithin>
</updateHandler>
<searchComponent name="testComponent" class="org.apache.solr.handler.component.ResourceSharingTestComponent" />
<requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="indent">true</str>
<str name="df">text</str>
</lst>
<arr name="first-components">
<str>testComponent</str>
</arr>
</requestHandler>
</config>

View File

@ -0,0 +1,138 @@
package org.apache.solr.core;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Set;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.ZKDatabase;
import org.junit.BeforeClass;
import org.junit.Test;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class BlobRepositoryCloudTest extends SolrCloudTestCase {
public static final Path TEST_PATH = getFile("solr/configsets").toPath();
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1) // only sharing *within* a node
.addConfig("configname", TEST_PATH.resolve("resource-sharing"))
.configure();
// Thread.sleep(2000);
HashMap<String, String> params = new HashMap<>();
cluster.createCollection(".system", 1, 1, null, params);
// Thread.sleep(2000);
// test component will fail if it cant' find a blob with this data by this name
postBlob("testResource", "foo,bar\nbaz,bam");
// Thread.sleep(2000);
// if these don't load we probably failed to post the blob above
cluster.createCollection("col1", 1, 1, "configname", params);
cluster.createCollection("col2", 1, 1, "configname", params);
// Thread.sleep(2000);
SolrInputDocument document = new SolrInputDocument();
document.addField("id", "1");
document.addField("text", "col1");
CloudSolrClient solrClient = cluster.getSolrClient();
solrClient.add("col1", document);
solrClient.commit("col1");
document = new SolrInputDocument();
document.addField("id", "1");
document.addField("text", "col2");
solrClient.add("col2", document);
solrClient.commit("col2");
Thread.sleep(2000);
}
@Test
public void test() throws Exception {
// This test relies on the installation of ResourceSharingTestComponent which has 2 useful properties:
// 1. it will fail to initialize if it doesn't find a 2 line CSV like foo,bar\nbaz,bam thus validating
// that we are properly pulling data from the blob store
// 2. It replaces any q for a query request to /select with "text:<name>" where <name> is the name
// of the last collection to run a query. It does this by caching a shared resource of type
// ResourceSharingTestComponent.TestObject, and the following sequence is proof that either
// collection can tell if it was (or was not) the last collection to issue a query by
// consulting the shared object
assertLastQueryNotToCollection("col1");
assertLastQueryNotToCollection("col2");
assertLastQueryNotToCollection("col1");
assertLastQueryToCollection("col1");
assertLastQueryNotToCollection("col2");
assertLastQueryToCollection("col2");
}
// TODO: move this up to parent class? Probably accepting entity, or with alternative signatures
private static void postBlob(String name, String string) throws IOException {
HttpPost post = new HttpPost(findLiveNodeURI() + "/.system/blob/" + name);
StringEntity csv = new StringEntity(string, ContentType.create("application/octet-stream"));
post.setEntity(csv);
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
httpclient.execute(post);
}
}
// TODO: move this up to parent class?
private static String findLiveNodeURI() {
ZkTestServer zkServer = cluster.getZkServer();
ZKDatabase zkDatabase = zkServer.getZKDatabase();
DataTree dataTree = zkDatabase.getDataTree();
DataNode node = dataTree.getNode("/solr/live_nodes");
Set<String> children = node.getChildren();
String liveNode = children.iterator().next();
String[] split = liveNode.split("_");
String host = split[0];
String name = split[1];
return "http://" + host + "/" + name;
}
private void assertLastQueryToCollection(String collection) throws SolrServerException, IOException {
assertEquals(1, getSolrDocuments(collection).size());
}
private void assertLastQueryNotToCollection(String collection) throws SolrServerException, IOException {
assertEquals(0, getSolrDocuments(collection).size());
}
private SolrDocumentList getSolrDocuments(String collection) throws SolrServerException, IOException {
SolrQuery query = new SolrQuery("*:*");
CloudSolrClient client = cluster.getSolrClient();
QueryResponse resp1 = client.query(collection, query);
return resp1.getResults();
}
}

View File

@ -0,0 +1,165 @@
package org.apache.solr.core;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrException;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class BlobRepositoryMockingTest {
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String[][] PARSED = new String[][]{{"foo", "bar", "baz"}, {"bang", "boom", "bash"}};
private static final String BLOBSTR = "foo,bar,baz\nbang,boom,bash";
private CoreContainer mockContainer = EasyMock.createMock(CoreContainer.class);
@SuppressWarnings("unchecked")
private ConcurrentHashMap<String, BlobRepository.BlobContent> mapMock = EasyMock.createMock(ConcurrentHashMap.class);
@SuppressWarnings("unchecked")
private BlobRepository.Decoder<Object> decoderMock = EasyMock.createMock(BlobRepository.Decoder.class);;
@SuppressWarnings("unchecked")
private BlobRepository.BlobContent<Object> blobContentMock = EasyMock.createMock(BlobRepository.BlobContent.class);
private Object[] mocks = new Object[] {
mockContainer,
decoderMock,
blobContentMock,
mapMock
};
BlobRepository repository;
ByteBuffer blobData = ByteBuffer.wrap(BLOBSTR.getBytes(UTF8));
boolean blobFetched = false;
String blobKey = "";
@Before
public void setUp() throws IllegalAccessException, NoSuchFieldException {
blobFetched = false;
blobKey = "";
EasyMock.reset(mocks);
repository = new BlobRepository(mockContainer) {
@Override
ByteBuffer fetchBlob(String key) {
blobKey = key;
blobFetched = true;
return blobData;
}
@Override
ConcurrentHashMap<String, BlobContent> createMap() {
return mapMock;
}
};
}
@After
public void tearDown() {
EasyMock.verify(mocks);
}
@Test (expected = SolrException.class)
public void testCloudOnly() {
expect(mockContainer.isZooKeeperAware()).andReturn(false);
EasyMock.replay(mocks);
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
}
@SuppressWarnings("unchecked")
@Test
public void testGetBlobIncrRefString() {
expect(mockContainer.isZooKeeperAware()).andReturn(true);
expect(mapMock.get("foo!")).andReturn(null);
expect(mapMock.put(eq("foo!"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
EasyMock.replay(mocks);
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
assertTrue("foo!".equals(blobKey));
assertTrue(blobFetched);
assertNotNull(ref.blob);
assertEquals(blobData, ref.blob.get());
}
@SuppressWarnings("unchecked")
@Test
public void testCachedAlready() {
expect(mockContainer.isZooKeeperAware()).andReturn(true);
expect(mapMock.get("foo!")).andReturn(new BlobRepository.BlobContent<BlobRepository>("foo!", blobData));
EasyMock.replay(mocks);
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
assertEquals("",blobKey);
assertFalse(blobFetched);
assertNotNull(ref.blob);
assertEquals(blobData, ref.blob.get());
}
@SuppressWarnings("unchecked")
@Test
public void testGetBlobIncrRefStringDecoder() {
expect(mockContainer.isZooKeeperAware()).andReturn(true);
expect(mapMock.get("foo!mocked")).andReturn(null);
expect(mapMock.put(eq("foo!mocked"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
EasyMock.replay(mocks);
BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!", new BlobRepository.Decoder<Object>() {
@Override
public Object decode(InputStream inputStream) {
StringWriter writer = new StringWriter();
try {
IOUtils.copy(inputStream, writer, UTF8);
} catch (IOException e) {
throw new RuntimeException(e);
}
assertEquals(BLOBSTR, writer.toString());
return PARSED;
}
@Override
public String getName() {
return "mocked";
}
});
assertEquals("foo!",blobKey);
assertTrue(blobFetched);
assertNotNull(ref.blob);
assertEquals(PARSED, ref.blob.get());
}
}

View File

@ -104,7 +104,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase {
Map map = TestSolrConfigHandler.getRespMap("/test1?wt=json", client);
assertNotNull(TestBlobHandler.getAsString(map), map = (Map) map.get("error"));
assertEquals(TestBlobHandler.getAsString(map), ".system collection not available", map.get("msg"));
assertTrue(TestBlobHandler.getAsString(map), map.get("msg").toString().contains(".system collection not available"));
TestBlobHandler.createSystemCollection(getHttpSolrClient(baseURL, randomClient.getHttpClient()));
@ -114,7 +114,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase {
assertNotNull(map = (Map) map.get("error"));
assertEquals("full output " + TestBlobHandler.getAsString(map), "no such blob or version available: colltest/1" , map.get("msg"));
assertTrue("full output " + TestBlobHandler.getAsString(map), map.get("msg").toString().contains("no such blob or version available: colltest/1" ));
payload = " {\n" +
" 'set' : {'watched': {" +
" 'x':'X val',\n" +

View File

@ -0,0 +1,149 @@
package org.apache.solr.handler.component;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.BlobRepository;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.invoke.MethodHandles;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class ResourceSharingTestComponent extends SearchComponent implements SolrCoreAware {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private SolrCore core;
private volatile BlobRepository.BlobContent<TestObject> blob;
@SuppressWarnings("SynchronizeOnNonFinalField")
@Override
public void prepare(ResponseBuilder rb) throws IOException {
SolrParams params = rb.req.getParams();
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
String q = "text:" + getTestObj().getLastCollection();
mParams.set("q", q); // search for the last collection name.
// This should cause the param to show up in the response...
rb.req.setParams(mParams);
getTestObj().setLastCollection(core.getCoreDescriptor().getCollectionName());
}
@Override
public void process(ResponseBuilder rb) throws IOException {}
@Override
public String getDescription() {
return "ResourceSharingTestComponent";
}
@Override
public String getSource() {
return null;
}
@SuppressWarnings("unchecked")
TestObject getTestObj() {
return this.blob.get();
}
@SuppressWarnings("unchecked")
@Override
public void inform(SolrCore core) {
log.info("Informing test component...");
this.core = core;
this.blob = core.loadDecodeAndCacheBlob(getKey(), new DumbCsvDecoder()).blob;
log.info("Test component informed!");
}
private String getKey() {
return getResourceName() + "/" + getResourceVersion();
}
public String getResourceName() {
return "testResource";
}
public String getResourceVersion() {
return "1";
}
class DumbCsvDecoder implements BlobRepository.Decoder<Object> {
private final Map<String, String> dict = new HashMap<>();
public DumbCsvDecoder() {}
void processSimpleCsvRow(String string) {
String[] row = string.split(","); // dumbest csv parser ever... :)
getDict().put(row[0], row[1]);
}
public Map<String, String> getDict() {
return dict;
}
@Override
public TestObject decode(InputStream inputStream) {
// loading a tiny csv like:
//
// foo,bar
// baz,bam
try (Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream, Charset.forName("UTF-8"))).lines()) {
lines.forEach(this::processSimpleCsvRow);
} catch (Exception e) {
log.error("failed to read dictionary {}", getResourceName() );
throw new RuntimeException("Cannot load dictionary " , e);
}
assertEquals("bar", dict.get("foo"));
assertEquals("bam", dict.get("baz"));
log.info("Loaded {} using {}", getDict().size(), this.getClass().getClassLoader());
// if we get here we have seen the data from the blob and all we need is to test that two collections
// are able to see the same object..
return new TestObject();
}
}
public static class TestObject {
public static final String NEVER_UPDATED = "never updated";
private volatile String lastCollection = NEVER_UPDATED;
public String getLastCollection() {
return this.lastCollection;
}
public void setLastCollection(String lastCollection) {
this.lastCollection = lastCollection;
}
}
}