mirror of https://github.com/apache/lucene.git
SOLR-13821: refactored the code to change the API to suit package loader
This commit is contained in:
parent
84126ea0ea
commit
88f457ee2a
|
@ -580,6 +580,9 @@ public class CoreContainer {
|
|||
return replayUpdatesExecutor;
|
||||
}
|
||||
|
||||
public PackageStoreAPI getPackageStoreAPI() {
|
||||
return packageStoreAPI;
|
||||
}
|
||||
//-------------------------------------------------------------------
|
||||
// Initialization / Cleanup
|
||||
//-------------------------------------------------------------------
|
||||
|
|
|
@ -29,12 +29,10 @@ import java.nio.file.Path;
|
|||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -45,7 +43,6 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.BlobRepository;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.filestore.PackageStoreAPI.MetaData;
|
||||
import org.apache.zookeeper.server.ByteBufferInputStream;
|
||||
|
@ -62,6 +59,7 @@ public class DistribPackageStore implements PackageStore {
|
|||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final CoreContainer coreContainer;
|
||||
private Map<String, FileInfo> tmpFiles = new ConcurrentHashMap<>();
|
||||
|
||||
public DistribPackageStore(CoreContainer coreContainer) {
|
||||
this.coreContainer = coreContainer;
|
||||
ensurePackageStoreDir(coreContainer.getResourceLoader().getInstancePath());
|
||||
|
@ -73,23 +71,13 @@ public class DistribPackageStore implements PackageStore {
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* get a list of nodes randomly shuffled
|
||||
* * @lucene.internal
|
||||
*/
|
||||
public ArrayList<String> shuffledNodes() {
|
||||
Set<String> liveNodes = coreContainer.getZkController().getZkStateReader().getClusterState().getLiveNodes();
|
||||
ArrayList<String> l = new ArrayList(liveNodes);
|
||||
l.remove(myNode());
|
||||
Collections.shuffle(l, BlobRepository.RANDOM);
|
||||
return l;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Path getRealpath(String path) {
|
||||
if (File.separatorChar == '\\') {
|
||||
path = path.replace('/' , File.separatorChar);
|
||||
path = path.replace('/', File.separatorChar);
|
||||
}
|
||||
if (path.charAt(0) != File.separatorChar) {
|
||||
path = File.separator + path;
|
||||
|
@ -182,7 +170,7 @@ public class DistribPackageStore implements PackageStore {
|
|||
try {
|
||||
IOUtils.deleteFilesIfExist(getRealpath(path), getRealpath(getMetaPath()));
|
||||
} catch (IOException e) {
|
||||
log.error("Unable to delete files: "+path);
|
||||
log.error("Unable to delete files: " + path);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -224,8 +212,7 @@ public class DistribPackageStore implements PackageStore {
|
|||
}
|
||||
|
||||
boolean fetchFromAnyNode() {
|
||||
|
||||
ArrayList<String> l = shuffledNodes();
|
||||
ArrayList<String> l = coreContainer.getPackageStoreAPI().shuffledNodes();
|
||||
ZkStateReader stateReader = coreContainer.getZkController().getZkStateReader();
|
||||
for (String liveNode : l) {
|
||||
try {
|
||||
|
@ -273,17 +260,15 @@ public class DistribPackageStore implements PackageStore {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public FileDetails getDetails() {
|
||||
FileType type = getType(path);
|
||||
FileType type = getType(path, false);
|
||||
|
||||
return new FileDetails() {
|
||||
@Override
|
||||
public MetaData getMetaData() {
|
||||
try {
|
||||
return readMetaData();
|
||||
} catch (Exception e){
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
@ -307,18 +292,17 @@ public class DistribPackageStore implements PackageStore {
|
|||
return;
|
||||
}
|
||||
ew.put("timestamp", getTimeStamp());
|
||||
metaData.writeMap(ew);
|
||||
if(metaData != null)
|
||||
metaData.writeMap(ew);
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void readData(Consumer<FileEntry> consumer) throws IOException {
|
||||
MetaData meta = readMetaData();
|
||||
try (InputStream is = new FileInputStream(realPath().toFile())) {
|
||||
consumer.accept(new FileEntry(null, meta,path ){
|
||||
consumer.accept(new FileEntry(null, meta, path) {
|
||||
@Override
|
||||
public InputStream getInputStream() {
|
||||
return is;
|
||||
|
@ -337,10 +321,10 @@ public class DistribPackageStore implements PackageStore {
|
|||
byte[] bytes = baos.toByteArray();
|
||||
info.persistToFile(entry.buf, ByteBuffer.wrap(bytes, 0, bytes.length));
|
||||
tmpFiles.put(entry.getPath(), info);
|
||||
List<String> nodes = shuffledNodes();
|
||||
List<String> nodes = coreContainer.getPackageStoreAPI().shuffledNodes();
|
||||
int i = 0;
|
||||
int FETCHFROM_SRC = 50;
|
||||
String myNodeName = myNode();
|
||||
String myNodeName = coreContainer.getZkController().getNodeName();
|
||||
try {
|
||||
for (String node : nodes) {
|
||||
String baseUrl = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(node);
|
||||
|
@ -395,11 +379,11 @@ public class DistribPackageStore implements PackageStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean fetch(String path, String from) {
|
||||
public boolean fetch(String path, String from) {
|
||||
if (path == null || path.isEmpty()) return false;
|
||||
FileInfo f = new FileInfo(path);
|
||||
try {
|
||||
if(f.exists(true, false)){
|
||||
if (f.exists(true, false)) {
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -419,7 +403,7 @@ public class DistribPackageStore implements PackageStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void get(String path, Consumer<FileEntry> consumer) throws IOException {
|
||||
public void get(String path, Consumer<FileEntry> consumer, boolean fetchmissing) throws IOException {
|
||||
File file = getRealpath(path).toFile();
|
||||
String simpleName = file.getName();
|
||||
if (isMetaDataFile(simpleName)) {
|
||||
|
@ -440,10 +424,10 @@ public class DistribPackageStore implements PackageStore {
|
|||
|
||||
|
||||
@Override
|
||||
public synchronized List list(String path, Predicate<String> predicate) {
|
||||
public List list(String path, Predicate<String> predicate) {
|
||||
File file = getRealpath(path).toFile();
|
||||
List<FileDetails> fileDetails = new ArrayList<>();
|
||||
FileType type = getType(path);
|
||||
FileType type = getType(path, false);
|
||||
if (type == FileType.DIRECTORY) {
|
||||
file.list((dir, name) -> {
|
||||
if (predicate == null || predicate.test(name)) {
|
||||
|
@ -455,7 +439,6 @@ public class DistribPackageStore implements PackageStore {
|
|||
});
|
||||
|
||||
} else if (type == FileType.FILE) {
|
||||
|
||||
fileDetails.add(new FileInfo(path).getDetails());
|
||||
}
|
||||
|
||||
|
@ -464,9 +447,14 @@ public class DistribPackageStore implements PackageStore {
|
|||
|
||||
|
||||
@Override
|
||||
public synchronized FileType getType(String path) {
|
||||
public FileType getType(String path, boolean fetchMissing) {
|
||||
File file = getRealpath(path).toFile();
|
||||
if (!file.exists()) return FileType.NOFILE;
|
||||
if (!file.exists() && fetchMissing) {
|
||||
if (fetch(path, null)) {
|
||||
file = getRealpath(path).toFile();
|
||||
}
|
||||
if (!file.exists()) return FileType.NOFILE;
|
||||
}
|
||||
if (file.isDirectory()) return FileType.DIRECTORY;
|
||||
return isMetaDataFile(file.getName()) ? FileType.METADATA : FileType.FILE;
|
||||
}
|
||||
|
|
|
@ -45,11 +45,11 @@ public interface PackageStore {
|
|||
/**
|
||||
* read file content from a given path
|
||||
*/
|
||||
void get(String path, Consumer<FileEntry> filecontent) throws IOException;
|
||||
void get(String path, Consumer<FileEntry> filecontent, boolean getMissing) throws IOException;
|
||||
|
||||
/**
|
||||
* Fetch a resource from another node
|
||||
* internal
|
||||
* internal API
|
||||
*/
|
||||
boolean fetch(String path, String from);
|
||||
|
||||
|
@ -63,7 +63,7 @@ public interface PackageStore {
|
|||
/**
|
||||
* The type of the resource
|
||||
*/
|
||||
FileType getType(String path);
|
||||
FileType getType(String path, boolean fetchMissing);
|
||||
|
||||
public class FileEntry {
|
||||
final ByteBuffer buf;
|
||||
|
|
|
@ -18,13 +18,18 @@
|
|||
package org.apache.solr.filestore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.solr.api.Command;
|
||||
|
@ -39,6 +44,7 @@ import org.apache.solr.common.params.SolrParams;
|
|||
import org.apache.solr.common.util.ContentStream;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.BlobRepository;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
|
@ -75,6 +81,50 @@ public class PackageStoreAPI {
|
|||
return packageStore;
|
||||
}
|
||||
|
||||
/**
|
||||
* get a list of nodes randomly shuffled
|
||||
* * @lucene.internal
|
||||
*/
|
||||
public ArrayList<String> shuffledNodes() {
|
||||
Set<String> liveNodes = coreContainer.getZkController().getZkStateReader().getClusterState().getLiveNodes();
|
||||
ArrayList<String> l = new ArrayList(liveNodes);
|
||||
l.remove(coreContainer.getZkController().getNodeName());
|
||||
Collections.shuffle(l, BlobRepository.RANDOM);
|
||||
return l;
|
||||
}
|
||||
|
||||
public void validateFiles(List<String> files, boolean validateSignatures, Consumer<String> errs) {
|
||||
for (String path : files) {
|
||||
try {
|
||||
PackageStore.FileType type = packageStore.getType(path, true);
|
||||
if (type != PackageStore.FileType.FILE) {
|
||||
errs.accept("No such file : " + path);
|
||||
continue;
|
||||
}
|
||||
|
||||
packageStore.get(path, entry -> {
|
||||
if (entry.getMetaData().signatures == null ||
|
||||
entry.getMetaData().signatures.isEmpty()) {
|
||||
errs.accept(path + " has no signature");
|
||||
return;
|
||||
}
|
||||
if (validateSignatures) {
|
||||
try {
|
||||
validate(entry.meta.signatures, entry);
|
||||
} catch (SolrException e) {
|
||||
log.error("error validating package artifact", e);
|
||||
errs.accept(e.getMessage());
|
||||
}
|
||||
}
|
||||
}, false);
|
||||
} catch (Exception e) {
|
||||
log.error("Error reading file ", e);
|
||||
errs.accept("Error reading file " + path + " " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@EndPoint(
|
||||
path = "/cluster/files/*",
|
||||
method = SolrRequest.METHOD.PUT,
|
||||
|
@ -187,7 +237,7 @@ public class PackageStoreAPI {
|
|||
path = "";
|
||||
}
|
||||
|
||||
PackageStore.FileType typ = packageStore.getType(path);
|
||||
PackageStore.FileType typ = packageStore.getType(path, false);
|
||||
if (typ == PackageStore.FileType.NOFILE) {
|
||||
rsp.add("files", Collections.singletonMap(path, null));
|
||||
return;
|
||||
|
@ -221,7 +271,7 @@ public class PackageStoreAPI {
|
|||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading file" + path);
|
||||
}
|
||||
});
|
||||
}, false);
|
||||
|
||||
});
|
||||
}
|
||||
|
@ -270,4 +320,36 @@ public class PackageStoreAPI {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void validate(List<String> sigs,
|
||||
PackageStore.FileEntry entry) throws SolrException {
|
||||
Map<String, byte[]> keys = CloudUtil.getTrustedKeys(
|
||||
coreContainer.getZkController().getZkClient(), "exe");
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"ZooKeeper does not have any public keys");
|
||||
}
|
||||
CryptoKeys cryptoKeys = null;
|
||||
try {
|
||||
cryptoKeys = new CryptoKeys(keys);
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Error parsing public keys in ZooKeeper");
|
||||
}
|
||||
for (String sig : sigs) {
|
||||
Supplier<String> errMsg = () -> "Signature does not match any public key : " + sig;
|
||||
if (entry.getBuffer() != null) {
|
||||
if (cryptoKeys.verify(sig, entry.getBuffer()) == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, errMsg.get());
|
||||
}
|
||||
} else {
|
||||
InputStream inputStream = entry.getInputStream();
|
||||
if (cryptoKeys.verify(sig, inputStream) == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, errMsg.get());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,6 +87,26 @@ public final class CryptoKeys implements CLIO {
|
|||
return null;
|
||||
}
|
||||
|
||||
public String verify(String sig, InputStream is) {
|
||||
exception = null;
|
||||
for (Map.Entry<String, PublicKey> entry : keys.entrySet()) {
|
||||
boolean verified;
|
||||
try {
|
||||
verified = CryptoKeys.verify(entry.getValue(), Base64.base64ToByteArray(sig), is);
|
||||
log.debug("verified {} ", verified);
|
||||
if (verified) return entry.getKey();
|
||||
} catch (Exception e) {
|
||||
exception = e;
|
||||
log.debug("NOT verified ");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Create PublicKey from a .DER file
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.apache.zookeeper.server.ByteBufferInputStream;
|
|||
import static org.apache.solr.common.util.Utils.JAVABINCONSUMER;
|
||||
import static org.apache.solr.core.TestDynamicLoading.getFileContent;
|
||||
|
||||
@LogLevel("org.apache.solr.core.PackageStoreAPI=DEBUG;org.apache.solr.core.DistribPackageStore=DEBUG")
|
||||
@LogLevel("org.apache.solr.filestore.PackageStoreAPI=DEBUG;org.apache.solr.filestore.DistribPackageStore=DEBUG")
|
||||
public class TestDistribPackageStore extends SolrCloudTestCase {
|
||||
|
||||
public void testPackageStoreManagement() throws Exception {
|
||||
|
@ -104,54 +104,26 @@ public class TestDistribPackageStore extends SolrCloudTestCase {
|
|||
)
|
||||
);
|
||||
|
||||
class Fetcher implements Callable {
|
||||
String url;
|
||||
JettySolrRunner jetty;
|
||||
Fetcher(String s, JettySolrRunner jettySolrRunner){
|
||||
this.url = s;
|
||||
this.jetty = jettySolrRunner;
|
||||
}
|
||||
@Override
|
||||
public NavigableObject call() throws Exception {
|
||||
try (HttpSolrClient solrClient = (HttpSolrClient) jetty.newClient()) {
|
||||
return (NavigableObject) Utils.executeGET(solrClient.getHttpClient(), this.url, JAVABINCONSUMER);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return url;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Map expected = Utils.makeMap(
|
||||
":files:/package/mypkg/v1.0/runtimelibs.jar:name", "runtimelibs.jar",
|
||||
":files:/package/mypkg/v1.0[0]:sha512", "d01b51de67ae1680a84a813983b1de3b592fc32f1a22b662fc9057da5953abd1b72476388ba342cad21671cd0b805503c78ab9075ff2f3951fdf75fa16981420"
|
||||
|
||||
);
|
||||
for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
|
||||
String baseUrl = jettySolrRunner.getBaseUrl().toString().replace("/solr", "/api");
|
||||
String url = baseUrl + "/node/files/package/mypkg/v1.0/runtimelibs.jar?wt=javabin&meta=true";
|
||||
waitForAllNodesHaveFile(cluster,"/package/mypkg/v1.0/runtimelibs.jar", expected, true);
|
||||
|
||||
assertResponseValues(10, new Fetcher(url, jettySolrRunner), expected);
|
||||
|
||||
try (HttpSolrClient solrClient = (HttpSolrClient) jettySolrRunner.newClient()) {
|
||||
ByteBuffer buf = Utils.executeGET(solrClient.getHttpClient(), baseUrl + "/node/files/package/mypkg/v1.0/runtimelibs.jar",
|
||||
Utils.newBytesConsumer(Integer.MAX_VALUE));
|
||||
assertEquals(
|
||||
"d01b51de67ae1680a84a813983b1de3b592fc32f1a22b662fc9057da5953abd1b72476388ba342cad21671cd0b805503c78ab9075ff2f3951fdf75fa16981420",
|
||||
DigestUtils.sha512Hex(new ByteBufferInputStream(buf))
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
postFile(cluster.getSolrClient(), getFileContent("runtimecode/runtimelibs_v2.jar.bin"),
|
||||
"/package/mypkg/v1.0/runtimelibs_v2.jar",
|
||||
null
|
||||
);
|
||||
expected = Utils.makeMap(
|
||||
":files:/package/mypkg/v1.0/runtimelibs_v2.jar:name", "runtimelibs_v2.jar",
|
||||
":files:/package/mypkg/v1.0[0]:sha512",
|
||||
"bc5ce45ad281b6a08fb7e529b1eb475040076834816570902acb6ebdd809410e31006efdeaa7f78a6c35574f3504963f5f7e4d92247d0eb4db3fc9abdda5d417"
|
||||
|
||||
);
|
||||
waitForAllNodesHaveFile(cluster,"/package/mypkg/v1.0/runtimelibs_v2.jar", expected, false);
|
||||
|
||||
|
||||
expected = Utils.makeMap(
|
||||
":files:/package/mypkg/v1.0", (Predicate<Object>) o -> {
|
||||
|
@ -180,6 +152,48 @@ public class TestDistribPackageStore extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static void waitForAllNodesHaveFile(MiniSolrCloudCluster cluster, String path, Map expected , boolean verifyContent) throws Exception {
|
||||
for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
|
||||
String baseUrl = jettySolrRunner.getBaseUrl().toString().replace("/solr", "/api");
|
||||
String url = baseUrl + "/node/files" + path + "?wt=javabin&meta=true";
|
||||
assertResponseValues(10, new Fetcher(url, jettySolrRunner), expected);
|
||||
|
||||
if(verifyContent) {
|
||||
try (HttpSolrClient solrClient = (HttpSolrClient) jettySolrRunner.newClient()) {
|
||||
ByteBuffer buf = Utils.executeGET(solrClient.getHttpClient(), baseUrl + "/node/files" + path,
|
||||
Utils.newBytesConsumer(Integer.MAX_VALUE));
|
||||
assertEquals(
|
||||
"d01b51de67ae1680a84a813983b1de3b592fc32f1a22b662fc9057da5953abd1b72476388ba342cad21671cd0b805503c78ab9075ff2f3951fdf75fa16981420",
|
||||
DigestUtils.sha512Hex(new ByteBufferInputStream(buf))
|
||||
);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
static class Fetcher implements Callable {
|
||||
String url;
|
||||
JettySolrRunner jetty;
|
||||
Fetcher(String s, JettySolrRunner jettySolrRunner){
|
||||
this.url = s;
|
||||
this.jetty = jettySolrRunner;
|
||||
}
|
||||
@Override
|
||||
public NavigableObject call() throws Exception {
|
||||
try (HttpSolrClient solrClient = (HttpSolrClient) jetty.newClient()) {
|
||||
return (NavigableObject) Utils.executeGET(solrClient.getHttpClient(), this.url, JAVABINCONSUMER);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return url;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static NavigableObject assertResponseValues(int repeats, SolrClient client, SolrRequest req, Map vals) throws Exception {
|
||||
Callable<NavigableObject> callable = () -> req.process(client);
|
||||
|
||||
|
@ -223,7 +237,7 @@ public class TestDistribPackageStore extends SolrCloudTestCase {
|
|||
|
||||
|
||||
|
||||
private void postFile(SolrClient client, ByteBuffer buffer, String name, String sig)
|
||||
public static void postFile(SolrClient client, ByteBuffer buffer, String name, String sig)
|
||||
throws SolrServerException, IOException {
|
||||
String resource = "/cluster/files" + name;
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
|
Loading…
Reference in New Issue