HBASE-27088 IntegrationLoadTestCommonCrawl async load improvements (#4488)
* HBASE-27088 IntegrationLoadTestCommonCrawl async load improvements - Use an async client and work stealing executor for parallelism during loads. - Remove the verification read retries, these are not that effective during replication lag anyway. - Increase max task attempts because S3 might throttle. - Implement a side task that exercises Increments by extracting urls from content and updating a cf that tracks referrer counts. These are not validated at this time. It could be possible to log the increments, sum them with a reducer, and then verify the total, but this is left as a future exercise. Signed-off-by: Viraj Jasani <vjasani@apache.org> * Sum RPC time for writes (loader) and reads (verifier) and mutation bytes submitted. Expose as job counters. * Fix an issue with completion chaining * Pause loading if too many operations are in flight
This commit is contained in:
parent
47230c4727
commit
4e19892a53
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.test;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
@ -26,10 +27,18 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
|
@ -45,14 +54,17 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
|||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncTable;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ScanResultConsumer;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
|
@ -114,6 +126,10 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
|||
* not specify compression (by default) there is roughly a 10x expansion. Loading the full crawl
|
||||
* archive results in a table approximately 640 TB in size.
|
||||
* <p>
|
||||
* The loader can optionally drive read load during ingest by incrementing counters for each URL
|
||||
* discovered in content. Add <tt>-DIntegrationTestLoadCommonCrawl.increments=true</tt> to the
|
||||
* command line to enable.
|
||||
* <p>
|
||||
* You can also split the Loader and Verify stages:
|
||||
* <p>
|
||||
* Load with: <blockquote> ./bin/hbase
|
||||
|
@ -136,26 +152,34 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class);
|
||||
|
||||
protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
|
||||
protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
|
||||
static final String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
|
||||
static final String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
|
||||
|
||||
protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
|
||||
protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
|
||||
protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
|
||||
protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
|
||||
protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
|
||||
protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c");
|
||||
protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d");
|
||||
protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
|
||||
protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
|
||||
protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
|
||||
static final String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments";
|
||||
static final boolean DEFAULT_INCREMENTS = false;
|
||||
|
||||
private static final int VERIFICATION_READ_RETRIES = 10;
|
||||
static final int MAX_INFLIGHT = 1000;
|
||||
static final int INFLIGHT_PAUSE_MS = 100;
|
||||
|
||||
static final byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
|
||||
static final byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
|
||||
static final byte[] URL_FAMILY_NAME = Bytes.toBytes("u");
|
||||
static final byte[] SEP = Bytes.toBytes(":");
|
||||
static final byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
|
||||
static final byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
|
||||
static final byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
|
||||
static final byte[] CRC_QUALIFIER = Bytes.toBytes("c");
|
||||
static final byte[] DATE_QUALIFIER = Bytes.toBytes("d");
|
||||
static final byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
|
||||
static final byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
|
||||
static final byte[] REF_QUALIFIER = Bytes.toBytes("ref");
|
||||
|
||||
public static enum Counts {
|
||||
REFERENCED,
|
||||
UNREFERENCED,
|
||||
CORRUPT
|
||||
CORRUPT,
|
||||
RPC_BYTES_WRITTEN,
|
||||
RPC_TIME_MS,
|
||||
}
|
||||
|
||||
protected Path warcFileInputDir = null;
|
||||
|
@ -241,6 +265,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
Set<String> families = new HashSet<>();
|
||||
families.add(Bytes.toString(CONTENT_FAMILY_NAME));
|
||||
families.add(Bytes.toString(INFO_FAMILY_NAME));
|
||||
families.add(Bytes.toString(URL_FAMILY_NAME));
|
||||
return families;
|
||||
}
|
||||
|
||||
|
@ -292,10 +317,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
qualifier != null ? qualifier.length : 0, ts);
|
||||
}
|
||||
|
||||
public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier) {
|
||||
this(row, family, qualifier, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
public HBaseKeyWritable(byte[] row, byte[] family, long ts) {
|
||||
this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts);
|
||||
}
|
||||
|
||||
public HBaseKeyWritable(byte[] row, byte[] family) {
|
||||
this(row, family, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
public HBaseKeyWritable(Cell cell) {
|
||||
this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(),
|
||||
cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(),
|
||||
|
@ -421,17 +454,24 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
Admin admin = conn.getAdmin()) {
|
||||
if (!admin.tableExists(tableName)) {
|
||||
|
||||
ColumnFamilyDescriptorBuilder contentFamilyBuilder = ColumnFamilyDescriptorBuilder
|
||||
.newBuilder(CONTENT_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE)
|
||||
.setBloomFilterType(BloomType.ROW).setMaxVersions(1000).setBlocksize(256 * 1024);
|
||||
ColumnFamilyDescriptorBuilder contentFamilyBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME).setMaxVersions(1000)
|
||||
.setDataBlockEncoding(DataBlockEncoding.NONE).setBloomFilterType(BloomType.ROW);
|
||||
|
||||
ColumnFamilyDescriptorBuilder infoFamilyBuilder = ColumnFamilyDescriptorBuilder
|
||||
.newBuilder(INFO_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE)
|
||||
.setBloomFilterType(BloomType.ROWCOL).setMaxVersions(1000).setBlocksize(8 * 1024);
|
||||
ColumnFamilyDescriptorBuilder infoFamilyBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME).setMaxVersions(1000)
|
||||
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1)
|
||||
.setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024);
|
||||
|
||||
ColumnFamilyDescriptorBuilder urlFamilyBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(URL_FAMILY_NAME).setMaxVersions(1000)
|
||||
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1)
|
||||
.setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024);
|
||||
|
||||
Set<ColumnFamilyDescriptor> families = new HashSet<>();
|
||||
families.add(contentFamilyBuilder.build());
|
||||
families.add(infoFamilyBuilder.build());
|
||||
families.add(urlFamilyBuilder.build());
|
||||
|
||||
TableDescriptor tableDescriptor =
|
||||
TableDescriptorBuilder.newBuilder(tableName).setColumnFamilies(families).build();
|
||||
|
@ -507,11 +547,23 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
job.setOutputKeyClass(HBaseKeyWritable.class);
|
||||
job.setOutputValueClass(BytesWritable.class);
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
// Increase max attempts because S3 might throttle aggressively and ultimately fail a task
|
||||
job.getConfiguration().setInt("mapred.map.max.attempts", 100);
|
||||
job.getConfiguration().setInt("mapreduce.map.maxattempts", 100);
|
||||
|
||||
boolean success = job.waitForCompletion(true);
|
||||
if (!success) {
|
||||
LOG.error("Failure during job " + job.getJobID());
|
||||
}
|
||||
|
||||
final Counters counters = job.getCounters();
|
||||
for (Counts c : Counts.values()) {
|
||||
long value = counters.findCounter(c).getValue();
|
||||
if (value != 0) {
|
||||
LOG.info(c + ": " + value);
|
||||
}
|
||||
}
|
||||
|
||||
return success ? 0 : 1;
|
||||
}
|
||||
|
||||
|
@ -539,24 +591,39 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
public static class LoaderMapper
|
||||
extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> {
|
||||
|
||||
protected Configuration conf;
|
||||
protected Connection conn;
|
||||
protected BufferedMutator mutator;
|
||||
protected AsyncConnection conn;
|
||||
protected AsyncTable<ScanResultConsumer> table;
|
||||
protected ExecutorService executor;
|
||||
protected AtomicLong inflight = new AtomicLong();
|
||||
protected boolean doIncrements;
|
||||
|
||||
@Override
|
||||
protected void setup(final Context context) throws IOException, InterruptedException {
|
||||
conf = context.getConfiguration();
|
||||
conn = ConnectionFactory.createConnection(conf);
|
||||
mutator = conn.getBufferedMutator(getTablename(conf));
|
||||
executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
|
||||
Configuration conf = context.getConfiguration();
|
||||
doIncrements = conf.getBoolean(INCREMENTS_NAME_KEY, DEFAULT_INCREMENTS);
|
||||
try {
|
||||
conn = ConnectionFactory.createAsyncConnection(conf).get();
|
||||
table = conn.getTable(getTablename(conf), executor);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup(final Context context) throws IOException, InterruptedException {
|
||||
try {
|
||||
mutator.close();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception closing Table", e);
|
||||
|
||||
while (inflight.get() != 0) {
|
||||
LOG.info("Operations in flight, waiting");
|
||||
Thread.sleep(INFLIGHT_PAUSE_MS);
|
||||
}
|
||||
|
||||
// Shut down the executor
|
||||
executor.shutdown();
|
||||
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||
LOG.warn("Pool did not shut down cleanly");
|
||||
}
|
||||
// Close the connection
|
||||
try {
|
||||
conn.close();
|
||||
} catch (Exception e) {
|
||||
|
@ -573,10 +640,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
if (warcHeader.getRecordType().equals("response") && targetURI != null) {
|
||||
final String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
|
||||
if (contentType != null) {
|
||||
LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
|
||||
|
||||
// Make row key
|
||||
|
||||
byte[] rowKey;
|
||||
try {
|
||||
rowKey = rowKeyFromTargetURI(targetURI);
|
||||
|
@ -590,22 +654,21 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
}
|
||||
|
||||
// Get the content and calculate the CRC64
|
||||
|
||||
final byte[] content = value.getRecord().getContent();
|
||||
final CRC64 crc = new CRC64();
|
||||
crc.update(content);
|
||||
final long crc64 = crc.getValue();
|
||||
LOG.info("{}: content {} bytes, crc64={}", targetURI, content.length,
|
||||
Bytes.toHex(Bytes.toBytes(crc64)));
|
||||
|
||||
// Store to HBase
|
||||
|
||||
final long ts = getCurrentTime();
|
||||
final long ts = getSequence();
|
||||
final Put put = new Put(rowKey);
|
||||
put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content);
|
||||
put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts,
|
||||
Bytes.toBytes(content.length));
|
||||
put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType));
|
||||
put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64));
|
||||
put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID));
|
||||
put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI));
|
||||
put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts,
|
||||
Bytes.toBytes(warcHeader.getDateString()));
|
||||
|
@ -613,20 +676,32 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
if (ipAddr != null) {
|
||||
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr));
|
||||
}
|
||||
mutator.mutate(put);
|
||||
long pending = inflight.incrementAndGet();
|
||||
while (pending > MAX_INFLIGHT) {
|
||||
LOG.info("Too many operations in flight, waiting");
|
||||
Thread.sleep(INFLIGHT_PAUSE_MS);
|
||||
pending = inflight.get();
|
||||
}
|
||||
final long putStartTime = System.currentTimeMillis();
|
||||
final CompletableFuture<Void> putFuture = table.put(put);
|
||||
putFuture.thenRun(() -> {
|
||||
inflight.decrementAndGet();
|
||||
if (!putFuture.isCompletedExceptionally()) {
|
||||
output.getCounter(Counts.RPC_TIME_MS)
|
||||
.increment(System.currentTimeMillis() - putStartTime);
|
||||
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize());
|
||||
}
|
||||
});
|
||||
|
||||
// Write records out for later verification, one per HBase field except for the
|
||||
// content record, which will be verified by CRC64.
|
||||
|
||||
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts),
|
||||
new BytesWritable(Bytes.toBytes(crc64)));
|
||||
output.write(
|
||||
new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts),
|
||||
new BytesWritable(Bytes.toBytes(content.length)));
|
||||
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts),
|
||||
new BytesWritable(Bytes.toBytes(contentType)));
|
||||
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts),
|
||||
new BytesWritable(Bytes.toBytes(recordID)));
|
||||
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts),
|
||||
new BytesWritable(Bytes.toBytes(crc64)));
|
||||
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts),
|
||||
new BytesWritable(Bytes.toBytes(targetURI)));
|
||||
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts),
|
||||
|
@ -635,54 +710,44 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts),
|
||||
new BytesWritable(Bytes.toBytes(ipAddr)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] rowKeyFromTargetURI(final String targetUri)
|
||||
throws URISyntaxException, IllegalArgumentException {
|
||||
final URI uri = new URI(targetUri);
|
||||
// Ignore the scheme
|
||||
// Reverse the components of the hostname
|
||||
String reversedHost;
|
||||
if (uri.getHost() != null) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
final String[] hostComponents = uri.getHost().split("\\.");
|
||||
for (int i = hostComponents.length - 1; i >= 0; i--) {
|
||||
sb.append(hostComponents[i]);
|
||||
if (i != 0) {
|
||||
sb.append('.');
|
||||
if (doIncrements) {
|
||||
// The URLs cf is not tracked for correctness. For now it is used only to exercise
|
||||
// Increments, to drive some read load during ingest. They can be verified with a
|
||||
// reducer to sum increments per row and then compare the final count to the table
|
||||
// data. This is left as a future exercise.
|
||||
final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey);
|
||||
for (String refUri : extractUrls(content)) {
|
||||
try {
|
||||
byte[] urlRowKey = rowKeyFromTargetURI(refUri);
|
||||
LOG.debug(" -> {}", refUri);
|
||||
final Increment increment = new Increment(urlRowKey);
|
||||
increment.setTimestamp(ts);
|
||||
increment.addColumn(URL_FAMILY_NAME, refQual, 1);
|
||||
pending = inflight.incrementAndGet();
|
||||
while (pending > MAX_INFLIGHT) {
|
||||
LOG.info("Too many operations in flight, waiting");
|
||||
Thread.sleep(INFLIGHT_PAUSE_MS);
|
||||
pending = inflight.get();
|
||||
}
|
||||
final long incrStartTime = System.currentTimeMillis();
|
||||
final CompletableFuture<Result> incrFuture = table.increment(increment);
|
||||
incrFuture.thenRun(() -> {
|
||||
inflight.decrementAndGet();
|
||||
if (!incrFuture.isCompletedExceptionally()) {
|
||||
output.getCounter(Counts.RPC_TIME_MS)
|
||||
.increment(System.currentTimeMillis() - incrStartTime);
|
||||
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize());
|
||||
}
|
||||
});
|
||||
} catch (IllegalArgumentException | URISyntaxException e) {
|
||||
LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
reversedHost = sb.toString();
|
||||
} else {
|
||||
throw new IllegalArgumentException("URI is missing host component");
|
||||
}
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append(reversedHost);
|
||||
if (uri.getPort() >= 0) {
|
||||
sb.append(':');
|
||||
sb.append(uri.getPort());
|
||||
}
|
||||
if (uri.getPath() != null) {
|
||||
sb.append('/');
|
||||
sb.append(uri.getPath());
|
||||
}
|
||||
if (uri.getQuery() != null) {
|
||||
sb.append('?');
|
||||
sb.append(uri.getQuery());
|
||||
}
|
||||
if (uri.getFragment() != null) {
|
||||
sb.append('#');
|
||||
sb.append(uri.getFragment());
|
||||
}
|
||||
if (sb.length() > HConstants.MAX_ROW_LENGTH) {
|
||||
throw new IllegalArgumentException("Key would be too large (length=" + sb.length()
|
||||
+ ", limit=" + HConstants.MAX_ROW_LENGTH);
|
||||
}
|
||||
return Bytes.toBytes(sb.toString());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -709,13 +774,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
job.setOutputKeyClass(NullWritable.class);
|
||||
job.setOutputValueClass(NullWritable.class);
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
|
||||
boolean success = job.waitForCompletion(true);
|
||||
if (!success) {
|
||||
LOG.error("Failure during job " + job.getJobID());
|
||||
}
|
||||
|
||||
final Counters counters = job.getCounters();
|
||||
for (Counts c : Counts.values()) {
|
||||
LOG.info(c + ": " + counters.findCounter(c).getValue());
|
||||
long value = counters.findCounter(c).getValue();
|
||||
if (value != 0) {
|
||||
LOG.info(c + ": " + value);
|
||||
}
|
||||
}
|
||||
if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) {
|
||||
LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID());
|
||||
|
@ -725,6 +795,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
LOG.error("Nonzero CORRUPT count from job " + job.getJobID());
|
||||
success = false;
|
||||
}
|
||||
|
||||
return success ? 0 : 1;
|
||||
}
|
||||
|
||||
|
@ -745,22 +816,25 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
public static class VerifyMapper
|
||||
extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> {
|
||||
|
||||
private Connection conn;
|
||||
private Table table;
|
||||
protected Connection conn;
|
||||
protected Table table;
|
||||
|
||||
@Override
|
||||
protected void setup(final Context context) throws IOException, InterruptedException {
|
||||
conn = ConnectionFactory.createConnection(context.getConfiguration());
|
||||
table = conn.getTable(getTablename(conn.getConfiguration()));
|
||||
Configuration conf = context.getConfiguration();
|
||||
conn = ConnectionFactory.createConnection(conf);
|
||||
table = conn.getTable(getTablename(conf));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup(final Context context) throws IOException, InterruptedException {
|
||||
// Close the table
|
||||
try {
|
||||
table.close();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception closing Table", e);
|
||||
LOG.warn("Exception closing table", e);
|
||||
}
|
||||
// Close the connection
|
||||
try {
|
||||
conn.close();
|
||||
} catch (Exception e) {
|
||||
|
@ -778,95 +852,146 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
|
|||
Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength());
|
||||
final long ts = key.getTimestamp();
|
||||
|
||||
int retries = VERIFICATION_READ_RETRIES;
|
||||
while (true) {
|
||||
|
||||
if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) {
|
||||
|
||||
final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
|
||||
final Result result =
|
||||
table.get(new Get(row).addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
|
||||
.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER).setTimestamp(ts));
|
||||
final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
|
||||
if (content == null) {
|
||||
if (retries-- > 0) {
|
||||
continue;
|
||||
}
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
} else {
|
||||
final CRC64 crc = new CRC64();
|
||||
crc.update(content);
|
||||
if (crc.getValue() != expectedCRC64) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
|
||||
output.getCounter(Counts.CORRUPT).increment(1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
|
||||
if (crc == null) {
|
||||
if (retries-- > 0) {
|
||||
continue;
|
||||
}
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
}
|
||||
if (Bytes.toLong(crc) != expectedCRC64) {
|
||||
if (retries-- > 0) {
|
||||
continue;
|
||||
}
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
|
||||
output.getCounter(Counts.CORRUPT).increment(1);
|
||||
return;
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
final Result result =
|
||||
table.get(new Get(row).addColumn(family, qualifier).setTimestamp(ts));
|
||||
final byte[] bytes = result.getValue(family, qualifier);
|
||||
if (bytes == null) {
|
||||
if (retries-- > 0) {
|
||||
continue;
|
||||
}
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing "
|
||||
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
}
|
||||
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
|
||||
if (retries-- > 0) {
|
||||
continue;
|
||||
}
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family)
|
||||
+ ":" + Bytes.toStringBinary(qualifier) + " mismatch");
|
||||
output.getCounter(Counts.CORRUPT).increment(1);
|
||||
return;
|
||||
}
|
||||
|
||||
if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) {
|
||||
final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
|
||||
final Get get = new Get(row).setTimestamp(ts).addFamily(CONTENT_FAMILY_NAME)
|
||||
.addFamily(INFO_FAMILY_NAME);
|
||||
final long startTime = System.currentTimeMillis();
|
||||
Result r;
|
||||
try {
|
||||
r = table.get(get);
|
||||
output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e);
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
}
|
||||
|
||||
// If we fell through to here all verification checks have succeeded, potentially after
|
||||
// retries, and we must exit the while loop.
|
||||
final byte[] crcBytes = r.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
|
||||
if (crcBytes == null) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
}
|
||||
if (Bytes.toLong(crcBytes) != expectedCRC64) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
|
||||
output.getCounter(Counts.CORRUPT).increment(1);
|
||||
return;
|
||||
}
|
||||
// If we fell through to here all verification checks have succeeded for the info
|
||||
// record.
|
||||
output.getCounter(Counts.REFERENCED).increment(1);
|
||||
final byte[] content = r.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
|
||||
if (content == null) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
} else {
|
||||
final CRC64 crc = new CRC64();
|
||||
crc.update(content);
|
||||
if (crc.getValue() != expectedCRC64) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
|
||||
output.getCounter(Counts.CORRUPT).increment(1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// If we fell through to here all verification checks have succeeded for the content
|
||||
// record.
|
||||
output.getCounter(Counts.REFERENCED).increment(1);
|
||||
} else {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final Get get = new Get(row).setTimestamp(ts).addColumn(family, qualifier);
|
||||
Result r;
|
||||
try {
|
||||
r = table.get(get);
|
||||
output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e);
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
}
|
||||
final byte[] bytes = r.getValue(family, qualifier);
|
||||
if (bytes == null) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing "
|
||||
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
|
||||
output.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
return;
|
||||
}
|
||||
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
|
||||
LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) + ":"
|
||||
+ Bytes.toStringBinary(qualifier) + " mismatch");
|
||||
output.getCounter(Counts.CORRUPT).increment(1);
|
||||
return;
|
||||
}
|
||||
// If we fell through to here all verification checks have succeeded for the info
|
||||
// record.
|
||||
output.getCounter(Counts.REFERENCED).increment(1);
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final AtomicLong counter = new AtomicLong();
|
||||
private static final int shift = 8;
|
||||
|
||||
private static long getCurrentTime() {
|
||||
// Typical hybrid logical clock scheme.
|
||||
// Take the current time, shift by 16 bits and zero those bits, and replace those bits
|
||||
// with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps
|
||||
// cannot be negative.
|
||||
return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L)
|
||||
| (counter.getAndIncrement() & 0xffffL);
|
||||
private static long getSequence() {
|
||||
long t = EnvironmentEdgeManager.currentTime();
|
||||
t <<= shift;
|
||||
t |= (counter.getAndIncrement() % (1 << shift));
|
||||
return t;
|
||||
}
|
||||
|
||||
private static byte[] rowKeyFromTargetURI(final String targetUri)
|
||||
throws IOException, URISyntaxException, IllegalArgumentException {
|
||||
final URI uri = new URI(targetUri);
|
||||
// Ignore the scheme
|
||||
// Reverse the components of the hostname
|
||||
String reversedHost;
|
||||
if (uri.getHost() != null) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
final String[] hostComponents = uri.getHost().split("\\.");
|
||||
for (int i = hostComponents.length - 1; i >= 0; i--) {
|
||||
sb.append(hostComponents[i]);
|
||||
if (i != 0) {
|
||||
sb.append('.');
|
||||
}
|
||||
}
|
||||
reversedHost = sb.toString();
|
||||
} else {
|
||||
throw new IllegalArgumentException("URI is missing host component");
|
||||
}
|
||||
final ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||
os.write(reversedHost.getBytes(StandardCharsets.UTF_8));
|
||||
if (uri.getPort() >= 0) {
|
||||
os.write(String.format(":%d", uri.getPort()).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
os.write((byte) '|');
|
||||
if (uri.getPath() != null) {
|
||||
os.write(uri.getPath().getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
if (uri.getQuery() != null) {
|
||||
os.write(String.format("?%s", uri.getQuery()).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
if (uri.getFragment() != null) {
|
||||
os.write(String.format("#%s", uri.getFragment()).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
if (os.size() > HConstants.MAX_ROW_LENGTH) {
|
||||
throw new IllegalArgumentException(
|
||||
"Key would be too large (length=" + os.size() + ", limit=" + HConstants.MAX_ROW_LENGTH);
|
||||
}
|
||||
return os.toByteArray();
|
||||
}
|
||||
|
||||
static final Pattern URL_PATTERN = Pattern.compile(
|
||||
"\\b((https?|ftp|file)://|(www|ftp)\\.)" + "[\\-A-Z0-9+&@#/%?=~_|$!:,\\.;]*[A-Z0-9+&@#/%=~_|$]",
|
||||
Pattern.CASE_INSENSITIVE);
|
||||
|
||||
private static Collection<String> extractUrls(byte[] content) {
|
||||
final Set<String> list = new HashSet<>(); // uniques
|
||||
final Matcher m = URL_PATTERN.matcher(new String(content, StandardCharsets.UTF_8));
|
||||
while (m.find()) {
|
||||
list.add(m.group());
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue