SOLR-13622: Add fileStream stream-source

This commit is contained in:
Jason Gerlowski 2019-07-14 21:43:48 -04:00
parent 100c160017
commit dc8e9afff9
9 changed files with 491 additions and 11 deletions

1
solr/.gitignore vendored
View File

@ -23,6 +23,7 @@
/server/logs/
/server/solr/zoo_data/
/server/solr/userfiles/
/server/solr-webapp
/server/start.jar

View File

@ -89,6 +89,8 @@ New Features
* SOLR-13553: Node level custom RequestHandlers (noble)
* SOLR-13622: Add files() stream source to create tuples from lines in local files (Jason Gerlowski and Joel Bernstein)
Improvements
----------------------

View File

@ -801,6 +801,32 @@ public class SolrResourceLoader implements ResourceLoader,Closeable
return Paths.get(home);
}
/**
* Solr allows users to store arbitrary files in a special directory located directly under SOLR_HOME.
*
* This directory is generally created by each node on startup. Files located in this directory can then be
* manipulated using select Solr features (e.g. streaming expressions).
*/
public static final String USER_FILES_DIRECTORY = "userfiles";
public static void ensureUserFilesDataDir(Path solrHome) {
final Path userFilesPath = getUserFilesPath(solrHome);
final File userFilesDirectory = new File(userFilesPath.toString());
if (! userFilesDirectory.exists()) {
try {
final boolean created = userFilesDirectory.mkdir();
if (! created) {
log.warn("Unable to create [{}] directory in SOLR_HOME [{}]. Features requiring this directory may fail.", USER_FILES_DIRECTORY, solrHome);
}
} catch (Exception e) {
log.warn("Unable to create [" + USER_FILES_DIRECTORY + "] directory in SOLR_HOME [" + solrHome + "]. Features requiring this directory may fail.", e);
}
}
}
public static Path getUserFilesPath(Path solrHome) {
return Paths.get(solrHome.toAbsolutePath().toString(), USER_FILES_DIRECTORY).toAbsolutePath();
}
// Logs a message only once per startup
private static void logOnceInfo(String key, String msg) {
if (!loggedOnce.contains(key)) {

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.StringUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FilesStream extends TupleStream implements Expressible {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String commaDelimitedFilepaths;
private final int maxLines; // -1 for no max
private StreamContext context;
private String chroot;
private Iterator<CrawlFile> allFilesToCrawl;
private int linesReturned = 0;
private CrawlFile currentFilePath;
private LineIterator currentFileLines;
public FilesStream(StreamExpression expression, StreamFactory factory) throws IOException {
this(factory.getValueOperand(expression, 0), factory.getIntOperand(expression, "maxLines", -1));
}
public FilesStream(String commaDelimitedFilepaths, int maxLines) {
if (commaDelimitedFilepaths == null) {
throw new IllegalArgumentException("No filepaths provided to stream");
}
final String filepathsWithoutSurroundingQuotes = stripSurroundingQuotesIfTheyExist(commaDelimitedFilepaths);
if (StringUtils.isEmpty(filepathsWithoutSurroundingQuotes)) {
throw new IllegalArgumentException("No filepaths provided to stream");
}
this.commaDelimitedFilepaths = filepathsWithoutSurroundingQuotes;
this.maxLines = maxLines;
}
private String stripSurroundingQuotesIfTheyExist(String value) {
if (value.length() < 2) return value;
if ((value.startsWith("\"") && value.endsWith("\"")) || (value.startsWith("'") && value.endsWith("'"))) {
return value.substring(1, value.length() - 1);
}
return value;
}
@Override
public void setStreamContext(StreamContext context) {
this.context = context;
Object solrCoreObj = context.get("solr-core");
if (solrCoreObj == null || !(solrCoreObj instanceof SolrCore) ) {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "StreamContext must have SolrCore in solr-core key");
}
final SolrCore core = (SolrCore) context.get("solr-core");
this.chroot = Paths.get(core.getCoreContainer().getSolrHome(), SolrResourceLoader.USER_FILES_DIRECTORY).toString();
if (! new File(this.chroot).exists()) {
throw new IllegalStateException(SolrResourceLoader.USER_FILES_DIRECTORY + " directory used to load files must exist but could not be found!");
}
}
@Override
public List<TupleStream> children() {
return new ArrayList<>();
}
@Override
public void open() throws IOException {
final List<CrawlFile> initialCrawlSeeds = validateAndSetFilepathsInSandbox();
final List<CrawlFile> filesToCrawl = new ArrayList<>();
for (CrawlFile crawlSeed: initialCrawlSeeds) {
findReadableFiles(crawlSeed, filesToCrawl);
}
log.debug("Found files [{}] to stream from roots: [{}]", filesToCrawl, initialCrawlSeeds);
this.allFilesToCrawl = filesToCrawl.iterator();
}
@Override
public void close() throws IOException {}
@Override
public Tuple read() throws IOException {
if (maxLines >= 0 && linesReturned >= maxLines) {
if (currentFileLines != null) currentFileLines.close();
return createEofTuple();
} else if (currentFileHasMoreLinesToRead()) {
return fetchNextLineFromCurrentFile();
} else if (advanceToNextFileWithData()) {
return fetchNextLineFromCurrentFile();
} else { // No more data
return createEofTuple();
}
}
@Override
public StreamComparator getStreamSort() {
return null;
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter("\"" + commaDelimitedFilepaths + "\"");
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
.withExpression(toExpression(factory).toString());
}
private List<CrawlFile> validateAndSetFilepathsInSandbox() {
final String[] relativePathRoots = commaDelimitedFilepaths.split(",");
final List<CrawlFile> crawlSeeds = new ArrayList<>();
for (String crawlRoot : relativePathRoots) {
final File crawlRootFile = new File(crawlRoot);
if (crawlRootFile.isAbsolute()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"file/directory to stream must be provided as an absolute path: " + crawlRoot);
}
if ( crawlRoot.contains("..")) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"relative file/directory paths cannot contain '..': " + crawlRoot);
}
final String rootAbsolutePath = getAbsolutePath(crawlRoot);
if (! new File(getAbsolutePath(crawlRoot)).exists()) {
log.warn("Unable to find abs path: {}", getAbsolutePath(crawlRoot));
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"file/directory to stream doesn't exist: " + crawlRoot);
}
crawlSeeds.add(new CrawlFile(crawlRoot, rootAbsolutePath));
}
return crawlSeeds;
}
private boolean advanceToNextFileWithData() throws IOException {
while (allFilesToCrawl.hasNext()) {
if (currentFileLines != null) {
currentFileLines.close();
}
currentFilePath = allFilesToCrawl.next();
currentFileLines = FileUtils.lineIterator(new File(currentFilePath.absolutePath), "UTF-8");
if (currentFileLines.hasNext()) return true;
}
return false;
}
private Tuple fetchNextLineFromCurrentFile() {
linesReturned++;
HashMap m = new HashMap();
m.put("file", currentFilePath.displayPath);
m.put("line", currentFileLines.next());
return new Tuple(m);
}
private Tuple createEofTuple() {
HashMap m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
}
private boolean currentFileHasMoreLinesToRead() {
return currentFileLines != null && currentFileLines.hasNext();
}
private String getAbsolutePath(String pathRelativeToChroot) {
return Paths.get(chroot, pathRelativeToChroot).toString();
}
private void findReadableFiles(CrawlFile seed, List<CrawlFile> foundFiles) {
final File entry = new File(seed.absolutePath);
// Skip over paths that don't exist or that are symbolic links
if ((!entry.exists()) || (!entry.canRead()) || Files.isSymbolicLink(entry.toPath())) {
return;
}
// We already know that the path in question exists, is readable, and is in our sandbox
if (entry.isFile()) {
foundFiles.add(seed);
} else if (entry.isDirectory()) {
final String[] directoryContents = entry.list();
Arrays.sort(directoryContents);
if (directoryContents != null) {
for (String item : directoryContents) {
final String itemDisplayPath = Paths.get(seed.displayPath, item).toString();
final String itemAbsolutePath = Paths.get(seed.absolutePath, item).toString();
findReadableFiles(new CrawlFile(itemDisplayPath, itemAbsolutePath), foundFiles);
}
}
}
}
// A pair of paths for a particular file to stream:
// - absolute path for reading,
// - display path to avoid leaking Solr node fs details in tuples (relative to chroot)
public class CrawlFile {
private final String displayPath;
private final String absolutePath;
public CrawlFile(String displayPath, String absolutePath) {
this.displayPath = displayPath;
this.absolutePath = absolutePath;
}
}
}

View File

@ -34,6 +34,7 @@ public class SolrDefaultStreamFactory extends DefaultStreamFactory {
public SolrDefaultStreamFactory() {
super();
this.withFunctionName("analyze", AnalyzeEvaluator.class);
this.withFunctionName("files", FilesStream.class);
this.withFunctionName("classify", ClassifyStream.class);
this.withFunctionName("haversineMeters", HaversineMetersEvaluator.class);
}

View File

@ -177,9 +177,9 @@ public class SolrDispatchFilter extends BaseSolrFilter {
extraProperties = new Properties();
String solrHome = (String) config.getServletContext().getAttribute(SOLRHOME_ATTRIBUTE);
coresInit = createCoreContainer(solrHome == null ? SolrResourceLoader.locateSolrHome() : Paths.get(solrHome),
extraProperties);
final Path solrHomePath = solrHome == null ? SolrResourceLoader.locateSolrHome() : Paths.get(solrHome);
coresInit = createCoreContainer(solrHomePath, extraProperties);
SolrResourceLoader.ensureUserFilesDataDir(solrHomePath);
this.httpClient = coresInit.getUpdateShardHandler().getDefaultHttpClient();
setupJvmMetrics(coresInit);
log.debug("user.dir=" + System.getProperty("user.dir"));

View File

@ -216,6 +216,33 @@ features(collection1,
numTerms=250)
----
== files
The `files` function reads the specified files or directories and emits each line in the file(s) as a tuple.
Each emitted tuple contains two fields: `file` and `line`. `file` contains the path to the file being read from relative to the `userfiles` chroot (directly under `$SOLR_HOME`), and `line` contains a line in that file.
`files` is ideally used with the `update` stream to index data from the specified documents, or with the `analyze` stream to further split the lines into individual tokens for statistical processing or visualization.
=== files Parameters
* `filePaths`: (Mandatory) a comma separated list of filepaths to read lines from. If the specified path is a directory, it will be crawled recursively and all contained files will be read. To prevent malicious users from reading arbitrary files from Solr nodes, `filePaths` must be a relative path measured from a chroot of `$SOLR_HOME/userfiles` on the node running the streaming expression.
* `maxLines`: (defaults to -1) The maximum number of lines to read (and tuples to emit). If a negative value is specified, all lines in the specified files will be emitted as tuples. Files are read in the order that they appear in the comma-separated `filePaths` argument. If the line-limit is hit, it will be these later files that are partially emitted or not read at all.
=== files Examples
The following example emits all lines from a single text file located at `$SOLR_HOME/userfiles/authors.txt`:
[source,text]
----
files("authors.txt")
----
This example will read lines from `$SOLR_HOME/userfiles/authors.txt`, as well as all files (recursively) found under `$SOLR_HOME/userfiles/fiction/scifi`. Only 500 lines will be emitted, meaning that some files may be partially emitted or not read at all:
[source,text]
----
files("authors.txt,fiction/scifi/", maxLines=500)
----
== nodes
The `nodes` function provides breadth-first graph traversal. For details, see the section <<graph-traversal.adoc#graph-traversal,Graph Traversal>>.

View File

@ -46,6 +46,7 @@ public class Lang {
.withFunctionName("random", RandomFacadeStream.class)
.withFunctionName("knnSearch", KnnStream.class)
// decorator streams
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)

View File

@ -16,7 +16,12 @@
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -49,6 +54,8 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrResourceLoader;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
@ -60,6 +67,7 @@ import org.junit.Test;
public class StreamExpressionTest extends SolrCloudTestCase {
private static final String COLLECTIONORALIAS = "collection1";
private static final String FILESTREAM_COLLECTION = "filestream_collection";
private static final int TIMEOUT = DEFAULT_TIMEOUT;
private static final String id = "id";
@ -85,6 +93,12 @@ public class StreamExpressionTest extends SolrCloudTestCase {
if (useAlias) {
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
}
// Create a collection for use by the filestream() expression, and place some files there for it to read.
CollectionAdminRequest.createCollection(FILESTREAM_COLLECTION, "conf", 1, 1).process(cluster.getSolrClient());
cluster.waitForActiveCollection(FILESTREAM_COLLECTION, 1, 1);
final String dataDir = findUserFilesDataDir();
populateFileStreamData(dataDir);
}
@Before
@ -713,14 +727,6 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
}
@Test
public void testStatsStream() throws Exception {
@ -3057,6 +3063,106 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
}
@Test
public void testFileStreamSingleFile() throws Exception {
final String fileStream = "files(\"topLevel1.txt\")";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", fileStream);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION;
SolrStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(4, tuples.size());
for (int i = 0; i < 4; i++) {
Tuple t = tuples.get(i);
assertEquals("topLevel1.txt line " + String.valueOf(i+1), t.get("line"));
assertEquals("topLevel1.txt", t.get("file"));
}
}
@Test
public void testFileStreamMaxLines() throws Exception {
final String fileStream = "files(\"topLevel1.txt\", maxLines=2)";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", fileStream);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION;
SolrStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(2, tuples.size());
for (int i = 0; i < 2; i++) {
Tuple t = tuples.get(i);
assertEquals("topLevel1.txt line " + String.valueOf(i+1), t.get("line"));
assertEquals("topLevel1.txt", t.get("file"));
}
}
@Test
public void testFileStreamDirectoryCrawl() throws Exception {
final String fileStream = "files(\"directory1\")";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", fileStream);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION;
SolrStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(8, tuples.size());
for (int i = 0; i < 4; i++) {
Tuple t = tuples.get(i);
assertEquals("secondLevel1.txt line " + String.valueOf(i+1), t.get("line"));
assertEquals("directory1/secondLevel1.txt", t.get("file"));
}
for (int i = 4; i < 8; i++) {
Tuple t = tuples.get(i);
assertEquals("secondLevel2.txt line " + String.valueOf(i - 3), t.get("line"));
assertEquals("directory1/secondLevel2.txt", t.get("file"));
}
}
@Test
public void testFileStreamMultipleExplicitFiles() throws Exception {
final String fileStream = "files(\"topLevel1.txt,directory1/secondLevel2.txt\")";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", fileStream);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION;
SolrStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(8, tuples.size());
for (int i = 0; i < 4; i++) {
Tuple t = tuples.get(i);
assertEquals("topLevel1.txt line " + String.valueOf(i+1), t.get("line"));
assertEquals("topLevel1.txt", t.get("file"));
}
for (int i = 4; i < 8; i++) {
Tuple t = tuples.get(i);
assertEquals("secondLevel2.txt line " + String.valueOf(i - 3), t.get("line"));
assertEquals("directory1/secondLevel2.txt", t.get("file"));
}
}
private void assertSuccess(String expr, StreamContext streamContext) throws IOException {
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
@ -3068,6 +3174,60 @@ public class StreamExpressionTest extends SolrCloudTestCase {
getTuples(solrStream);
}
private static String findUserFilesDataDir() {
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
final String baseDir = cluster.getBaseDir().toAbsolutePath().toString();
for (CoreDescriptor coreDescriptor : jetty.getCoreContainer().getCoreDescriptors()) {
if (coreDescriptor.getCollectionName().equals(FILESTREAM_COLLECTION)) {
return Paths.get(jetty.getSolrHome(), SolrResourceLoader.USER_FILES_DIRECTORY).toAbsolutePath().toString();
}
}
}
throw new IllegalStateException("Unable to determine data-dir for: "+ FILESTREAM_COLLECTION);
}
/**
* Creates a tree of files underneath a provided data-directory.
*
* The filetree created looks like:
*
* dataDir
* |- topLevel1.txt
* |- topLevel2.txt
* |- directory1
* |- secondLevel1.txt
* |- secondLevel2.txt
*
* Each file contains 4 lines. Each line looks like: "<filename> line <linenumber>"
*/
private static void populateFileStreamData(String dataDir) throws Exception {
final File baseDataDir = new File(dataDir);
if (! baseDataDir.exists()) baseDataDir.mkdir();
final File directory1 = new File(Paths.get(dataDir, "directory1").toString());
directory1.mkdir();
final File topLevel1 = new File(Paths.get(dataDir, "topLevel1.txt").toString());
final File topLevel2 = new File(Paths.get(dataDir, "topLevel2.txt").toString());
final File secondLevel1 = new File(Paths.get(dataDir, "directory1", "secondLevel1.txt").toString());
final File secondLevel2 = new File(Paths.get(dataDir, "directory1", "secondLevel2.txt").toString());
populateFileWithData(topLevel1);
populateFileWithData(topLevel2);
populateFileWithData(secondLevel1);
populateFileWithData(secondLevel2);
}
private static void populateFileWithData(File dataFile) throws Exception {
dataFile.createNewFile();
try (final BufferedWriter writer = Files.newBufferedWriter(Paths.get(dataFile.toURI()), StandardCharsets.UTF_8)) {
for (int i = 1; i <=4; i++) {
writer.write(dataFile.getName() + " line " + String.valueOf(i));
writer.newLine();
}
}
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
List<Tuple> tuples = new ArrayList<Tuple>();