SOLR-15142: Allow the cat Streaming Expression to read gzip files

This commit is contained in:
Joel Bernstein 2021-02-08 15:07:09 -05:00
parent ed2eebfa4d
commit da8b8ecdb8
2 changed files with 45 additions and 4 deletions

View File

@ -17,7 +17,10 @@
package org.apache.solr.handler; package org.apache.solr.handler;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -25,6 +28,7 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator; import org.apache.commons.io.LineIterator;
@ -180,7 +184,12 @@ public class CatStream extends TupleStream implements Expressible {
while (allFilesToCrawl.hasNext()) { while (allFilesToCrawl.hasNext()) {
closeCurrentFileIfSet(); closeCurrentFileIfSet();
currentFilePath = allFilesToCrawl.next(); currentFilePath = allFilesToCrawl.next();
currentFileLines = FileUtils.lineIterator(currentFilePath.absolutePath.toFile(), "UTF-8"); File currentFile = currentFilePath.absolutePath.toFile();
if(currentFile.getName().endsWith(".gz")) {
currentFileLines = new LineIterator(new InputStreamReader(new GZIPInputStream(new FileInputStream(currentFile)), "UTF-8"));
} else {
currentFileLines = FileUtils.lineIterator(currentFile, "UTF-8");
}
if (currentFileLines.hasNext()) return true; if (currentFileLines.hasNext()) return true;
} }

View File

@ -16,9 +16,7 @@
*/ */
package org.apache.solr.client.solrj.io.stream; package org.apache.solr.client.solrj.io.stream;
import java.io.BufferedWriter; import java.io.*;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -30,6 +28,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.zip.GZIPOutputStream;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
@ -3483,6 +3482,28 @@ public class StreamExpressionTest extends SolrCloudTestCase {
} }
} }
@Test
public void testCatStreamSingleGzipFile() throws Exception {
final String catStream = "cat(\"topLevel1.txt.gz\")";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", catStream);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+FILESTREAM_COLLECTION;
SolrStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(4, tuples.size());
for (int i = 0; i < 4; i++) {
Tuple t = tuples.get(i);
assertEquals("topLevel1.txt.gz line " + String.valueOf(i+1), t.get("line"));
assertEquals("topLevel1.txt.gz", t.get("file"));
}
}
@Test @Test
public void testCatStreamEmptyFile() throws Exception { public void testCatStreamEmptyFile() throws Exception {
final String catStream = "cat(\"topLevel-empty.txt\")"; final String catStream = "cat(\"topLevel-empty.txt\")";
@ -3648,6 +3669,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
Files.createDirectories(dataDir); Files.createDirectories(dataDir);
Files.createDirectories(dataDir.resolve("directory1")); Files.createDirectories(dataDir.resolve("directory1"));
populateFileWithGzipData(dataDir.resolve("topLevel1.txt.gz"));
populateFileWithData(dataDir.resolve("topLevel1.txt")); populateFileWithData(dataDir.resolve("topLevel1.txt"));
populateFileWithData(dataDir.resolve("topLevel2.txt")); populateFileWithData(dataDir.resolve("topLevel2.txt"));
Files.createFile(dataDir.resolve("topLevel-empty.txt")); Files.createFile(dataDir.resolve("topLevel-empty.txt"));
@ -3665,6 +3687,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
} }
} }
private static void populateFileWithGzipData(Path dataFile) throws Exception {
Files.createFile(dataFile);
try (final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(dataFile.toFile())), StandardCharsets.UTF_8))) {
for (int i = 1; i <=4; i++) {
writer.write(dataFile.getFileName() + " line " + i);
writer.newLine();
}
}
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException { protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
List<Tuple> tuples = new ArrayList<Tuple>(); List<Tuple> tuples = new ArrayList<Tuple>();