LUCENE-9191: make LineFileDocs random seeking more efficient by recording safe skip points in the concatenated gzip'd chunks

This commit is contained in:
Mike McCandless 2020-04-21 12:09:17 -04:00
parent c9cd623a62
commit e0c06ee6a6
5 changed files with 338 additions and 34 deletions

View File

@ -0,0 +1,247 @@
import os
import gzip
import time
import random
import re
import urllib.request
import subprocess
import tempfile
import shutil
DEBUG = False
TARGET_DOC_CHARS = 1024
def compress_with_seek_points(file_name_in, file_name_out, num_seek_points):
bytes_per_chunk = os.path.getsize(file_name_in) / num_seek_points
seek_points = []
if os.path.exists(file_name_out):
os.remove(file_name_out)
with open(file_name_in, 'rb') as f_in:
f_out = None
bytes_in_chunk = 0
chunk_count = 0
while True:
if f_out is None:
if os.path.exists(file_name_out):
seek_points.append(os.path.getsize(file_name_out))
print(' create chunk %s at pos=%s' % (chunk_count, seek_points[-1]))
else:
print(' create chunk %s at pos=0' % chunk_count)
f_out = gzip.open(file_name_out, 'ab')
chunk_count += 1
line = f_in.readline()
if len(line) == 0:
break
bytes_in_chunk += len(line)
f_out.write(line)
if bytes_in_chunk > bytes_per_chunk and chunk_count < num_seek_points:
f_out.close()
f_out = None
bytes_in_chunk = 0
with open(file_name_out[:-3] + '.seek', 'w') as f_out:
for seek_point in seek_points:
f_out.write('%d\n' % seek_point)
re_tag = re.compile('<[^>]+?>')
re_newlines = re.compile('\n+')
re_space = re.compile('\s')
# used to find word break, for splitting docs into ~1 KB sized smaller docs:
re_next_non_word_character = re.compile('\W', re.U)
EUROPARL_V7_URL = 'https://www.statmt.org/europarl/v7/europarl.tgz'
def split_docs(all_out, title_string, date_string, body_string):
'''
Splits docs into smallish (~1 KB) sized docs, repeating same title and date
'''
doc_count = 0
while len(body_string) > 0:
char_count = int(random.gauss(TARGET_DOC_CHARS, TARGET_DOC_CHARS/4))
if char_count < 64:
# trimmed normal?
continue
m = re_next_non_word_character.search(body_string, char_count)
if m is not None:
char_count = m.start(0)
else:
char_count = len(body_string)
body_string_fragment = body_string[:char_count].strip()
#print('write title %d, body %d' % (len(title_string), len(body_string_fragment)))
all_out.write('%s\t%s\t%s\n' % (title_string, date_string, body_string_fragment))
body_string = body_string[char_count:]
doc_count += 1
return doc_count
def sample_europarl():
# download europarl.tgz v7, if not already here (in cwd):
file_name = 'europarl.tgz'
if not os.path.exists(file_name):
print('Download %s to %s...' % (EUROPARL_V7_URL, file_name))
urllib.request.urlretrieve(EUROPARL_V7_URL, file_name + '.tmp')
os.rename(file_name + '.tmp', file_name)
else:
print('%s already here; skipping download...' % file_name)
if not DEBUG:
tmp_dir_path = tempfile.mkdtemp()
else:
tmp_dir_path = '/tmp/tmp31ekzg75'
print('Using tmp dir "%s"...' % tmp_dir_path)
try:
if not DEBUG:
cmd = 'tar xzf %s -C %s' % (file_name, tmp_dir_path)
print('Run: %s' % cmd)
subprocess.run(cmd, shell=True)
doc_count = 0
skip_count = 0
file_count = 0
all_txt_file_name = '%s/all.txt' % tmp_dir_path
print('Extract text...')
start_time = time.time()
next_print_time = start_time + 3
# normalize text a bit and concatenate all lines into single file, counting total lines/bytes
with open(all_txt_file_name, 'w', encoding='utf-8') as all_out:
for dir_path, dir_names, file_names in os.walk('%s/txt' % tmp_dir_path):
for file_name in file_names:
if file_name.endswith('.txt'):
file_count += 1
year, month, day = (int(x) for x in file_name[3:-4].split('-')[:3])
if year >= 50:
year = 1900 + year
else:
year = 2000 + year
date_string = '%04d-%02d-%02d' % (year, month, day)
# unfortunately we need errors='ignore' since in Europarl v7, one file (pl/ep-09-10-22-009.txt) has invalid utf-8:
chapter_count = 0
with open('%s/%s' % (dir_path, file_name), 'r', encoding='utf-8', errors='ignore') as f_in:
last_text = []
last_title = None
while True:
line = f_in.readline()
if line == '':
break
line = line.strip()
if line.startswith('<CHAPTER '):
if last_title is not None:
s = ' '.join(last_text)
s = re_tag.sub(' ', s)
s = re_newlines.sub(' ', s)
s = s.strip()
if len(s) > 0:
doc_count += split_docs(all_out, last_title, date_string, s)
else:
skip_count += 1
last_text = []
chapter_count += 1
while True:
last_title = f_in.readline()
if last_title == '':
last_title = None
break
last_title = re_tag.sub(' ', last_title).strip()
if len(last_title) > 0:
break
continue
else:
last_text.append(line)
if last_title is not None:
s = ' '.join(last_text)
s = re_tag.sub(' ', s)
s = re_newlines.sub(' ', s)
s = s.strip()
if len(s) > 0:
doc_count += split_docs(all_out, last_title, date_string, s)
else:
skip_count += 1
chapter_count += 1
else:
skip_count += 1
if chapter_count > 0:
#print('%s/%s: %d chapters' % (dir_path, file_name, chapter_count))
pass
now = time.time()
if now > next_print_time:
print('%4.1fs: keep %.2f K of %.2f K files (%.1f%%), %.2f M docs, %.2f GB...' % \
(now - start_time, (file_count - skip_count) / 1000, file_count / 1000,
100 * (file_count - skip_count) / file_count,
doc_count / 1000000, all_out.tell() / 1024/1024/1024))
while next_print_time < now:
next_print_time += 3
total_mb = os.path.getsize(all_txt_file_name)/1024/1024
now = time.time()
print('%4.1fs (done): keep %.2f K of %.2f K files (%.1f%%), %.2f M docs, %.2f GB...' % \
(now - start_time, (file_count - skip_count) / 1000, file_count / 1000,
100 * (file_count - skip_count) / file_count,
doc_count / 1000000, os.path.getsize(all_txt_file_name) / 1024/1024/1024))
print('Shuffle...')
subprocess.run('shuf %s > %s.shuffled' % (all_txt_file_name, all_txt_file_name), shell=True)
for mb in (20, 200, 2000):
print('Sample %d MB file...' % mb)
file_name_out = '%dmb.txt' % mb
with open(file_name_out, 'w', encoding='utf-8') as f_out:
chance = mb / total_mb
with open(all_txt_file_name + '.shuffled', 'r', encoding='utf-8') as f:
while True:
line = f.readline()
if len(line) == 0:
break
if random.random() <= chance:
f_out.write(line)
print(' got %.2f MB' % (os.path.getsize(file_name_out)/1024/1024))
compress_with_seek_points(file_name_out,
file_name_out + '.gz',
mb)
finally:
print('Removing tmp dir "%s"...' % tmp_dir_path)
if not DEBUG:
shutil.rmtree(tmp_dir_path)
print('\nWARNING: left ./europarl.tgz, which you should delete if you do not want it!\n')
if False:
compress_with_seek_points('/x/tmp/europarl.lines.txt',
'/x/tmp/foo.txt.gz',
16)
else:
sample_europarl()

View File

@ -204,6 +204,9 @@ Other
* LUCENE-9271: ByteBufferIndexInput was refactored to work on top of the
ByteBuffer API. (Adrien Grand)
* LUCENE-9191: Make LineFileDocs's random seeking more efficient, making tests using LineFileDocs faster (Robert Muir,
Mike McCandless)
======================= Lucene 8.5.1 =======================
Bug Fixes

View File

@ -29,6 +29,8 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
@ -60,7 +62,7 @@ public class LineFileDocs implements Closeable {
public LineFileDocs(Random random, String path) throws IOException {
this.path = path;
this.random = new Random(random.nextLong());
open(random);
open();
}
public LineFileDocs(Random random) throws IOException {
@ -74,24 +76,31 @@ public class LineFileDocs implements Closeable {
}
private long randomSeekPos(Random random, long size) {
if (random == null || size <= 3L)
if (random == null || size <= 3L) {
return 0L;
} else {
return (random.nextLong()&Long.MAX_VALUE) % (size/3);
}
}
private synchronized void open(Random random) throws IOException {
private synchronized void open() throws IOException {
InputStream is = getClass().getResourceAsStream(path);
boolean needSkip = true;
// true if the InputStream is not already randomly seek'd after the if/else block below:
boolean needSkip;
boolean skipFirstLineFragment = false;
long size = 0L, seekTo = 0L;
if (is == null) {
// if it's not in classpath, we load it as absolute filesystem path (e.g. Hudson's home dir)
// if it's not in classpath, we load it as absolute filesystem path (e.g. Jenkins' home dir)
Path file = Paths.get(path);
size = Files.size(file);
if (path.endsWith(".gz")) {
// if it is a gzip file, we need to use InputStream and slowly skipTo:
// if it is a gzip file, we need to use InputStream and seek to one of the pre-computed skip points:
is = Files.newInputStream(file);
needSkip = true;
} else {
// optimized seek using SeekableByteChannel
// file is not compressed: optimized seek using SeekableByteChannel
seekTo = randomSeekPos(random, size);
final SeekableByteChannel channel = Files.newByteChannel(file);
if (LuceneTestCase.VERBOSE) {
@ -99,35 +108,61 @@ public class LineFileDocs implements Closeable {
}
channel.position(seekTo);
is = Channels.newInputStream(channel);
// we (likely) seeked to the middle of a line:
skipFirstLineFragment = true;
needSkip = false;
}
} else {
// if the file comes from Classpath:
size = is.available();
needSkip = true;
}
if (path.endsWith(".gz")) {
is = new GZIPInputStream(is);
// guestimate:
size *= 2.8;
}
// If we only have an InputStream, we need to seek now,
// but this seek is a scan, so very inefficient!!!
if (needSkip) {
seekTo = randomSeekPos(random, size);
// LUCENE-9191: use the optimized (pre-computed, using dev-tools/scripts/create_line_file_docs.py)
// seek file, so we can seek in a gzip'd file
int index = path.lastIndexOf('.');
if (index == -1) {
throw new IllegalArgumentException("could not determine extension for path \"" + path + "\"");
}
// e.g. foo.txt --> foo.seek, foo.txt.gz --> foo.txt.seek
String seekFilePath = path.substring(0, index) + ".seek";
InputStream seekIS = getClass().getResourceAsStream(seekFilePath);
if (seekIS == null) {
seekIS = Files.newInputStream(Paths.get(seekFilePath));
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(seekIS,
StandardCharsets.UTF_8))) {
List<Long> skipPoints = new ArrayList<>();
// explicitly insert implicit 0 as the first skip point:
skipPoints.add(0L);
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
skipPoints.add(Long.parseLong(line.trim()));
}
seekTo = skipPoints.get(random.nextInt(skipPoints.size()));
// dev-tools/scripts/create_line_file_docs.py ensures this is a "safe" skip point, and we
// can begin gunziping from here:
is.skip(seekTo);
is = new GZIPInputStream(is);
if (LuceneTestCase.VERBOSE) {
System.out.println("TEST: LineFileDocs: stream skip to fp=" + seekTo + " on open");
}
is.skip(seekTo);
}
// if we seeked somewhere, read until newline char
if (seekTo > 0L) {
int b;
do {
b = is.read();
} while (b >= 0 && b != 13 && b != 10);
}
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder()
@ -135,16 +170,16 @@ public class LineFileDocs implements Closeable {
.onUnmappableCharacter(CodingErrorAction.REPORT);
reader = new BufferedReader(new InputStreamReader(is, decoder), BUFFER_SIZE);
if (seekTo > 0L) {
// read one more line, to make sure we are not inside a Windows linebreak (\r\n):
if (skipFirstLineFragment) {
// read until end of line:
reader.readLine();
}
}
public synchronized void reset(Random random) throws IOException {
public synchronized void reset() throws IOException {
reader.close();
reader = null;
open(random);
open();
id.set(0);
}
@ -209,7 +244,7 @@ public class LineFileDocs implements Closeable {
}
reader.close();
reader = null;
open(null);
open();
line = reader.readLine();
}
}

View File

@ -0,0 +1,19 @@
485680
971105
1452723
1940496
2424299
2909775
3397858
3881659
4365085
4849651
5334212
5817709
6306315
6790697
7276765
7760619
8245188
8733672
9220523