HBASE-26349 Improve recent change to IntegrationTestLoadCommonCrawl (#3744)

Use a hybrid logical clock for timestamping entries.

Using BufferedMutator without HLC was not good because we assign client timestamps,
and the store loop is fast enough that on rare occasion two temporally adjacent URLs
in the set of WARCs are equivalent and the timestamp does not advance, leading later
to a rare false positive CORRUPT finding.

While making changes, support direct S3N paths as input paths on the command line.

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2021-10-19 13:45:55 -07:00 committed by GitHub
parent bfa4584125
commit 9e73ea878d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 104 additions and 80 deletions

View File

@ -26,12 +26,12 @@ import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
@ -84,7 +84,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/**
@ -163,17 +162,17 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
REFERENCED, UNREFERENCED, CORRUPT
}
Path warcFileInputDir = null;
Path outputDir = null;
String[] args;
protected Path warcFileInputDir = null;
protected Path outputDir = null;
protected String[] args;
protected int runLoader(Path warcFileInputDir, Path outputDir) throws Exception {
protected int runLoader(final Path warcFileInputDir, final Path outputDir) throws Exception {
Loader loader = new Loader();
loader.setConf(conf);
return loader.run(warcFileInputDir, outputDir);
}
protected int runVerify(Path inputDir) throws Exception {
protected int runVerify(final Path inputDir) throws Exception {
Verify verify = new Verify();
verify.setConf(conf);
return verify.run(inputDir);
@ -208,7 +207,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
}
@Override
protected void processOptions(CommandLine cmd) {
protected void processOptions(final CommandLine cmd) {
processBaseOptions(cmd);
args = cmd.getArgs();
}
@ -232,7 +231,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
}
}
static TableName getTablename(Configuration c) {
static TableName getTablename(final Configuration c) {
return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
}
@ -421,7 +420,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
private static final String USAGE = "Loader <warInputDir | warFileList> <outputDir>";
void createSchema(TableName tableName) throws IOException {
void createSchema(final TableName tableName) throws IOException {
try (Connection conn = ConnectionFactory.createConnection(getConf());
Admin admin = conn.getAdmin()) {
@ -477,24 +476,24 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
}
}
int run(Path warcFileInput, Path outputDir)
int run(final Path warcFileInput, final Path outputDir)
throws IOException, ClassNotFoundException, InterruptedException {
createSchema(getTablename(getConf()));
Job job = Job.getInstance(getConf());
final Job job = Job.getInstance(getConf());
job.setJobName(Loader.class.getName());
job.setNumReduceTasks(0);
job.setJarByClass(getClass());
job.setMapperClass(LoaderMapper.class);
job.setInputFormatClass(WARCInputFormat.class);
FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf());
final FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf());
if (fs.getFileStatus(warcFileInput).isDirectory()) {
LOG.info("Using directory as WARC input path: " + warcFileInput);
FileInputFormat.setInputPaths(job, warcFileInput);
} else {
} else if (warcFileInput.toUri().getScheme().equals("file")) {
LOG.info("Getting WARC input paths from file: " + warcFileInput);
List<Path> paths = new LinkedList<Path>();
final List<Path> paths = new ArrayList<Path>();
try (FSDataInputStream is = fs.open(warcFileInput)) {
InputStreamReader reader;
if (warcFileInput.getName().toLowerCase().endsWith(".gz")) {
@ -511,6 +510,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
}
LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput);
FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
} else {
FileInputFormat.setInputPaths(job, warcFileInput);
}
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, outputDir);
@ -550,20 +551,19 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
public static class LoaderMapper
extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> {
Configuration conf;
Connection conn;
BufferedMutator mutator;
protected Configuration conf;
protected Connection conn;
protected BufferedMutator mutator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
protected void setup(final Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
conn = ConnectionFactory.createConnection(conf);
mutator = conn.getBufferedMutator(getTablename(conf));
mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
protected void cleanup(final Context context) throws IOException, InterruptedException {
try {
mutator.close();
} catch (Exception e) {
@ -577,16 +577,15 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
}
@Override
protected void map(LongWritable key, WARCWritable value, Context output)
protected void map(final LongWritable key, final WARCWritable value, final Context output)
throws IOException, InterruptedException {
WARCRecord.Header warcHeader = value.getRecord().getHeader();
String recordID = warcHeader.getRecordID();
String targetURI = warcHeader.getTargetURI();
final WARCRecord.Header warcHeader = value.getRecord().getHeader();
final String recordID = warcHeader.getRecordID();
final String targetURI = warcHeader.getTargetURI();
if (warcHeader.getRecordType().equals("response") && targetURI != null) {
String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
final String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
if (contentType != null) {
LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
long now = EnvironmentEdgeManager.currentTime();
// Make row key
@ -604,62 +603,63 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
// Get the content and calculate the CRC64
byte[] content = value.getRecord().getContent();
CRC64 crc = new CRC64();
final byte[] content = value.getRecord().getContent();
final CRC64 crc = new CRC64();
crc.update(content);
long crc64 = crc.getValue();
final long crc64 = crc.getValue();
// Store to HBase
Put put = new Put(rowKey);
put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, now, content);
put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, now,
final long ts = getCurrentTime();
final Put put = new Put(rowKey);
put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content);
put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts,
Bytes.toBytes(content.length));
put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, now,
put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts,
Bytes.toBytes(contentType));
put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, now, Bytes.toBytes(crc64));
put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, now, Bytes.toBytes(recordID));
put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, now, Bytes.toBytes(targetURI));
put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, now,
put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64));
put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID));
put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI));
put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts,
Bytes.toBytes(warcHeader.getDateString()));
String ipAddr = warcHeader.getField("WARC-IP-Address");
final String ipAddr = warcHeader.getField("WARC-IP-Address");
if (ipAddr != null) {
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr));
}
mutator.mutate(put);
// Write records out for later verification, one per HBase field except for the
// content record, which will be verified by CRC64.
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, now),
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(crc64)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER,
now), new BytesWritable(Bytes.toBytes(content.length)));
ts), new BytesWritable(Bytes.toBytes(content.length)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER,
now), new BytesWritable(Bytes.toBytes(contentType)));
ts), new BytesWritable(Bytes.toBytes(contentType)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER,
now), new BytesWritable(Bytes.toBytes(recordID)));
ts), new BytesWritable(Bytes.toBytes(recordID)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER,
now), new BytesWritable(Bytes.toBytes(targetURI)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, now),
ts), new BytesWritable(Bytes.toBytes(targetURI)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(warcHeader.getDateString())));
if (ipAddr != null) {
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER,
now), new BytesWritable(Bytes.toBytes(ipAddr)));
ts), new BytesWritable(Bytes.toBytes(ipAddr)));
}
}
}
}
private byte[] rowKeyFromTargetURI(String targetUri)
private byte[] rowKeyFromTargetURI(final String targetUri)
throws URISyntaxException, IllegalArgumentException {
URI uri = new URI(targetUri);
final URI uri = new URI(targetUri);
// Ignore the scheme
// Reverse the components of the hostname
String reversedHost;
if (uri.getHost() != null) {
StringBuffer sb = new StringBuffer();
String[] hostComponents = uri.getHost().split("\\.");
final StringBuilder sb = new StringBuilder();
final String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]);
if (i != 0) {
@ -670,7 +670,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
} else {
throw new IllegalArgumentException("URI is missing host component");
}
StringBuffer sb = new StringBuffer();
final StringBuilder sb = new StringBuilder();
sb.append(reversedHost);
if (uri.getPort() >= 0) {
sb.append(':');
@ -700,7 +700,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
public static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
protected boolean isSplitable(final JobContext context, final Path filename) {
return false;
}
}
@ -710,7 +710,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
public static final Logger LOG = LoggerFactory.getLogger(Verify.class);
public static final String USAGE = "Verify <inputDir>";
int run(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException {
int run(final Path inputDir)
throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(getConf());
job.setJobName(Verify.class.getName());
job.setJarByClass(getClass());
@ -725,10 +726,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (!success) {
LOG.error("Failure during job " + job.getJobID());
}
Counters counters = job.getCounters();
final Counters counters = job.getCounters();
for (Counts c: Counts.values()) {
LOG.info(c + ": " + counters.findCounter(c).getValue());
}
if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) {
LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID());
success = false;
}
if (counters.findCounter(Counts.CORRUPT).getValue() > 0) {
LOG.error("Nonzero CORRUPT count from job " + job.getJobID());
success = false;
}
return success ? 0 : 1;
}
@ -749,44 +758,51 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
public static class VerifyMapper
extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> {
Connection conn;
Table table;
private Connection conn;
private Table table;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
protected void setup(final Context context) throws IOException, InterruptedException {
conn = ConnectionFactory.createConnection(context.getConfiguration());
table = conn.getTable(getTablename(conn.getConfiguration()));
}
@Override
protected void cleanup(Context context) throws IOException ,InterruptedException {
table.close();
conn.close();
protected void cleanup(final Context context) throws IOException, InterruptedException {
try {
table.close();
} catch (Exception e) {
LOG.warn("Exception closing Table", e);
}
try {
conn.close();
} catch (Exception e) {
LOG.warn("Exception closing Connection", e);
}
}
@Override
protected void map(HBaseKeyWritable key, BytesWritable value, Context output)
throws IOException, InterruptedException {
byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength());
byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(),
protected void map(final HBaseKeyWritable key, final BytesWritable value,
final Context output) throws IOException, InterruptedException {
final byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength());
final byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(),
key.getFamilyLength());
byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
final byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
key.getQualifierLength());
long ts = key.getTimestamp();
int retries = VERIFICATION_READ_RETRIES;
final long ts = key.getTimestamp();
int retries = VERIFICATION_READ_RETRIES;
while (true) {
if (Bytes.equals(INFO_FAMILY_NAME, family) &&
Bytes.equals(CRC_QUALIFIER, qualifier)) {
long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
Result result = table.get(new Get(row)
final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
final Result result = table.get(new Get(row)
.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
.setTimestamp(ts));
byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
if (content == null) {
if (retries-- > 0) {
continue;
@ -795,18 +811,15 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
} else {
CRC64 crc = new CRC64();
final CRC64 crc = new CRC64();
crc.update(content);
if (crc.getValue() != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
if (retries-- > 0) {
continue;
@ -826,10 +839,10 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
} else {
Result result = table.get(new Get(row)
final Result result = table.get(new Get(row)
.addColumn(family, qualifier)
.setTimestamp(ts));
byte[] bytes = result.getValue(family, qualifier);
final byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
if (retries-- > 0) {
continue;
@ -862,4 +875,15 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
}
}
private static final AtomicLong counter = new AtomicLong();
private static long getCurrentTime() {
// Typical hybrid logical clock scheme.
// Take the current time, shift by 16 bits and zero those bits, and replace those bits
// with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps
// cannot be negative.
return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L) |
(counter.getAndIncrement() & 0xffffL);
}
}