HBASE-27088 IntegrationLoadTestCommonCrawl async load improvements (#4488)

* HBASE-27088 IntegrationLoadTestCommonCrawl async load improvements

- Use an async client and work stealing executor for parallelism during loads.
- Remove the verification read retries, these are not that effective during
  replication lag anyway.
- Increase max task attempts because S3 might throttle.
- Implement a side task that exercises Increments by extracting urls from
  content and updating a cf that tracks referrer counts. These are not
  validated at this time. It could be possible to log the increments, sum
  them with a reducer, and then verify the total, but this is left as a
  future exercise.

Signed-off-by: Viraj Jasani <vjasani@apache.org>

* Sum RPC time for writes (loader) and reads (verifier) and mutation bytes submitted. Expose as job counters.

* Fix an issue with completion chaining

* Pause loading if too many operations are in flight
This commit is contained in:
Andrew Purtell 2022-07-13 09:01:21 -07:00 committed by GitHub
parent 2662607b71
commit ef1641d277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 298 additions and 173 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.test;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -26,10 +27,18 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@ -45,14 +54,17 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@ -114,6 +126,10 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
* not specify compression (by default) there is roughly a 10x expansion. Loading the full crawl
* archive results in a table approximately 640 TB in size.
* <p>
* The loader can optionally drive read load during ingest by incrementing counters for each URL
* discovered in content. Add <tt>-DIntegrationTestLoadCommonCrawl.increments=true</tt> to the
* command line to enable.
* <p>
* You can also split the Loader and Verify stages:
* <p>
* Load with: <blockquote> ./bin/hbase
@ -136,26 +152,34 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class);
protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
static final String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
static final String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c");
protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d");
protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
static final String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments";
static final boolean DEFAULT_INCREMENTS = false;
private static final int VERIFICATION_READ_RETRIES = 10;
static final int MAX_INFLIGHT = 1000;
static final int INFLIGHT_PAUSE_MS = 100;
static final byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
static final byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
static final byte[] URL_FAMILY_NAME = Bytes.toBytes("u");
static final byte[] SEP = Bytes.toBytes(":");
static final byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
static final byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
static final byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
static final byte[] CRC_QUALIFIER = Bytes.toBytes("c");
static final byte[] DATE_QUALIFIER = Bytes.toBytes("d");
static final byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
static final byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
static final byte[] REF_QUALIFIER = Bytes.toBytes("ref");
public static enum Counts {
REFERENCED,
UNREFERENCED,
CORRUPT
CORRUPT,
RPC_BYTES_WRITTEN,
RPC_TIME_MS,
}
protected Path warcFileInputDir = null;
@ -241,6 +265,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
Set<String> families = new HashSet<>();
families.add(Bytes.toString(CONTENT_FAMILY_NAME));
families.add(Bytes.toString(INFO_FAMILY_NAME));
families.add(Bytes.toString(URL_FAMILY_NAME));
return families;
}
@ -292,10 +317,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
qualifier != null ? qualifier.length : 0, ts);
}
public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier) {
this(row, family, qualifier, Long.MAX_VALUE);
}
public HBaseKeyWritable(byte[] row, byte[] family, long ts) {
this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts);
}
public HBaseKeyWritable(byte[] row, byte[] family) {
this(row, family, Long.MAX_VALUE);
}
public HBaseKeyWritable(Cell cell) {
this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(),
cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(),
@ -421,17 +454,24 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
Admin admin = conn.getAdmin()) {
if (!admin.tableExists(tableName)) {
ColumnFamilyDescriptorBuilder contentFamilyBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(CONTENT_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE)
.setBloomFilterType(BloomType.ROW).setMaxVersions(1000).setBlocksize(256 * 1024);
ColumnFamilyDescriptorBuilder contentFamilyBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME).setMaxVersions(1000)
.setDataBlockEncoding(DataBlockEncoding.NONE).setBloomFilterType(BloomType.ROW);
ColumnFamilyDescriptorBuilder infoFamilyBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(INFO_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE)
.setBloomFilterType(BloomType.ROWCOL).setMaxVersions(1000).setBlocksize(8 * 1024);
ColumnFamilyDescriptorBuilder infoFamilyBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME).setMaxVersions(1000)
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1)
.setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024);
ColumnFamilyDescriptorBuilder urlFamilyBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(URL_FAMILY_NAME).setMaxVersions(1000)
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1)
.setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024);
Set<ColumnFamilyDescriptor> families = new HashSet<>();
families.add(contentFamilyBuilder.build());
families.add(infoFamilyBuilder.build());
families.add(urlFamilyBuilder.build());
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(tableName).setColumnFamilies(families).build();
@ -507,11 +547,23 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
job.setOutputKeyClass(HBaseKeyWritable.class);
job.setOutputValueClass(BytesWritable.class);
TableMapReduceUtil.addDependencyJars(job);
// Increase max attempts because S3 might throttle aggressively and ultimately fail a task
job.getConfiguration().setInt("mapred.map.max.attempts", 100);
job.getConfiguration().setInt("mapreduce.map.maxattempts", 100);
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Failure during job " + job.getJobID());
}
final Counters counters = job.getCounters();
for (Counts c : Counts.values()) {
long value = counters.findCounter(c).getValue();
if (value != 0) {
LOG.info(c + ": " + value);
}
}
return success ? 0 : 1;
}
@ -539,24 +591,39 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
public static class LoaderMapper
extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> {
protected Configuration conf;
protected Connection conn;
protected BufferedMutator mutator;
protected AsyncConnection conn;
protected AsyncTable<ScanResultConsumer> table;
protected ExecutorService executor;
protected AtomicLong inflight = new AtomicLong();
protected boolean doIncrements;
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
conn = ConnectionFactory.createConnection(conf);
mutator = conn.getBufferedMutator(getTablename(conf));
executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
Configuration conf = context.getConfiguration();
doIncrements = conf.getBoolean(INCREMENTS_NAME_KEY, DEFAULT_INCREMENTS);
try {
conn = ConnectionFactory.createAsyncConnection(conf).get();
table = conn.getTable(getTablename(conf), executor);
} catch (ExecutionException e) {
throw new IOException(e);
}
}
@Override
protected void cleanup(final Context context) throws IOException, InterruptedException {
try {
mutator.close();
} catch (Exception e) {
LOG.warn("Exception closing Table", e);
while (inflight.get() != 0) {
LOG.info("Operations in flight, waiting");
Thread.sleep(INFLIGHT_PAUSE_MS);
}
// Shut down the executor
executor.shutdown();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
LOG.warn("Pool did not shut down cleanly");
}
// Close the connection
try {
conn.close();
} catch (Exception e) {
@ -573,10 +640,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (warcHeader.getRecordType().equals("response") && targetURI != null) {
final String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
if (contentType != null) {
LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
// Make row key
byte[] rowKey;
try {
rowKey = rowKeyFromTargetURI(targetURI);
@ -590,22 +654,21 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
}
// Get the content and calculate the CRC64
final byte[] content = value.getRecord().getContent();
final CRC64 crc = new CRC64();
crc.update(content);
final long crc64 = crc.getValue();
LOG.info("{}: content {} bytes, crc64={}", targetURI, content.length,
Bytes.toHex(Bytes.toBytes(crc64)));
// Store to HBase
final long ts = getCurrentTime();
final long ts = getSequence();
final Put put = new Put(rowKey);
put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content);
put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts,
Bytes.toBytes(content.length));
put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType));
put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64));
put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID));
put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI));
put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts,
Bytes.toBytes(warcHeader.getDateString()));
@ -613,20 +676,32 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (ipAddr != null) {
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr));
}
mutator.mutate(put);
long pending = inflight.incrementAndGet();
while (pending > MAX_INFLIGHT) {
LOG.info("Too many operations in flight, waiting");
Thread.sleep(INFLIGHT_PAUSE_MS);
pending = inflight.get();
}
final long putStartTime = System.currentTimeMillis();
final CompletableFuture<Void> putFuture = table.put(put);
putFuture.thenRun(() -> {
inflight.decrementAndGet();
if (!putFuture.isCompletedExceptionally()) {
output.getCounter(Counts.RPC_TIME_MS)
.increment(System.currentTimeMillis() - putStartTime);
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize());
}
});
// Write records out for later verification, one per HBase field except for the
// content record, which will be verified by CRC64.
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(crc64)));
output.write(
new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(content.length)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(contentType)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(recordID)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(crc64)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(targetURI)));
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts),
@ -635,54 +710,44 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts),
new BytesWritable(Bytes.toBytes(ipAddr)));
}
}
}
}
private byte[] rowKeyFromTargetURI(final String targetUri)
throws URISyntaxException, IllegalArgumentException {
final URI uri = new URI(targetUri);
// Ignore the scheme
// Reverse the components of the hostname
String reversedHost;
if (uri.getHost() != null) {
final StringBuilder sb = new StringBuilder();
final String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]);
if (i != 0) {
sb.append('.');
if (doIncrements) {
// The URLs cf is not tracked for correctness. For now it is used only to exercise
// Increments, to drive some read load during ingest. They can be verified with a
// reducer to sum increments per row and then compare the final count to the table
// data. This is left as a future exercise.
final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey);
for (String refUri : extractUrls(content)) {
try {
byte[] urlRowKey = rowKeyFromTargetURI(refUri);
LOG.debug(" -> {}", refUri);
final Increment increment = new Increment(urlRowKey);
increment.setTimestamp(ts);
increment.addColumn(URL_FAMILY_NAME, refQual, 1);
pending = inflight.incrementAndGet();
while (pending > MAX_INFLIGHT) {
LOG.info("Too many operations in flight, waiting");
Thread.sleep(INFLIGHT_PAUSE_MS);
pending = inflight.get();
}
final long incrStartTime = System.currentTimeMillis();
final CompletableFuture<Result> incrFuture = table.increment(increment);
incrFuture.thenRun(() -> {
inflight.decrementAndGet();
if (!incrFuture.isCompletedExceptionally()) {
output.getCounter(Counts.RPC_TIME_MS)
.increment(System.currentTimeMillis() - incrStartTime);
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize());
}
});
} catch (IllegalArgumentException | URISyntaxException e) {
LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e);
}
}
}
}
reversedHost = sb.toString();
} else {
throw new IllegalArgumentException("URI is missing host component");
}
final StringBuilder sb = new StringBuilder();
sb.append(reversedHost);
if (uri.getPort() >= 0) {
sb.append(':');
sb.append(uri.getPort());
}
if (uri.getPath() != null) {
sb.append('/');
sb.append(uri.getPath());
}
if (uri.getQuery() != null) {
sb.append('?');
sb.append(uri.getQuery());
}
if (uri.getFragment() != null) {
sb.append('#');
sb.append(uri.getFragment());
}
if (sb.length() > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException("Key would be too large (length=" + sb.length()
+ ", limit=" + HConstants.MAX_ROW_LENGTH);
}
return Bytes.toBytes(sb.toString());
}
}
}
@ -709,13 +774,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
TableMapReduceUtil.addDependencyJars(job);
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Failure during job " + job.getJobID());
}
final Counters counters = job.getCounters();
for (Counts c : Counts.values()) {
LOG.info(c + ": " + counters.findCounter(c).getValue());
long value = counters.findCounter(c).getValue();
if (value != 0) {
LOG.info(c + ": " + value);
}
}
if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) {
LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID());
@ -725,6 +795,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
LOG.error("Nonzero CORRUPT count from job " + job.getJobID());
success = false;
}
return success ? 0 : 1;
}
@ -745,22 +816,25 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
public static class VerifyMapper
extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> {
private Connection conn;
private Table table;
protected Connection conn;
protected Table table;
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
conn = ConnectionFactory.createConnection(context.getConfiguration());
table = conn.getTable(getTablename(conn.getConfiguration()));
Configuration conf = context.getConfiguration();
conn = ConnectionFactory.createConnection(conf);
table = conn.getTable(getTablename(conf));
}
@Override
protected void cleanup(final Context context) throws IOException, InterruptedException {
// Close the table
try {
table.close();
} catch (Exception e) {
LOG.warn("Exception closing Table", e);
LOG.warn("Exception closing table", e);
}
// Close the connection
try {
conn.close();
} catch (Exception e) {
@ -778,95 +852,146 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength());
final long ts = key.getTimestamp();
int retries = VERIFICATION_READ_RETRIES;
while (true) {
if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) {
final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
final Result result =
table.get(new Get(row).addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER).setTimestamp(ts));
final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
if (content == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
} else {
final CRC64 crc = new CRC64();
crc.update(content);
if (crc.getValue() != expectedCRC64) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crc) != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
} else {
final Result result =
table.get(new Get(row).addColumn(family, qualifier).setTimestamp(ts));
final byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing "
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family)
+ ":" + Bytes.toStringBinary(qualifier) + " mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) {
final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
final Get get = new Get(row).setTimestamp(ts).addFamily(CONTENT_FAMILY_NAME)
.addFamily(INFO_FAMILY_NAME);
final long startTime = System.currentTimeMillis();
Result r;
try {
r = table.get(get);
output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime);
} catch (Exception e) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e);
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
// If we fell through to here all verification checks have succeeded, potentially after
// retries, and we must exit the while loop.
final byte[] crcBytes = r.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crcBytes == null) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crcBytes) != expectedCRC64) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
// If we fell through to here all verification checks have succeeded for the info
// record.
output.getCounter(Counts.REFERENCED).increment(1);
final byte[] content = r.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
if (content == null) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
} else {
final CRC64 crc = new CRC64();
crc.update(content);
if (crc.getValue() != expectedCRC64) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
// If we fell through to here all verification checks have succeeded for the content
// record.
output.getCounter(Counts.REFERENCED).increment(1);
} else {
final long startTime = System.currentTimeMillis();
final Get get = new Get(row).setTimestamp(ts).addColumn(family, qualifier);
Result r;
try {
r = table.get(get);
output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime);
} catch (Exception e) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e);
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
final byte[] bytes = r.getValue(family, qualifier);
if (bytes == null) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing "
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) + ":"
+ Bytes.toStringBinary(qualifier) + " mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
// If we fell through to here all verification checks have succeeded for the info
// record.
output.getCounter(Counts.REFERENCED).increment(1);
break;
}
}
}
}
private static final AtomicLong counter = new AtomicLong();
private static final int shift = 8;
private static long getCurrentTime() {
// Typical hybrid logical clock scheme.
// Take the current time, shift by 16 bits and zero those bits, and replace those bits
// with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps
// cannot be negative.
return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L)
| (counter.getAndIncrement() & 0xffffL);
private static long getSequence() {
long t = EnvironmentEdgeManager.currentTime();
t <<= shift;
t |= (counter.getAndIncrement() % (1 << shift));
return t;
}
private static byte[] rowKeyFromTargetURI(final String targetUri)
throws IOException, URISyntaxException, IllegalArgumentException {
final URI uri = new URI(targetUri);
// Ignore the scheme
// Reverse the components of the hostname
String reversedHost;
if (uri.getHost() != null) {
final StringBuilder sb = new StringBuilder();
final String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]);
if (i != 0) {
sb.append('.');
}
}
reversedHost = sb.toString();
} else {
throw new IllegalArgumentException("URI is missing host component");
}
final ByteArrayOutputStream os = new ByteArrayOutputStream();
os.write(reversedHost.getBytes(StandardCharsets.UTF_8));
if (uri.getPort() >= 0) {
os.write(String.format(":%d", uri.getPort()).getBytes(StandardCharsets.UTF_8));
}
os.write((byte) '|');
if (uri.getPath() != null) {
os.write(uri.getPath().getBytes(StandardCharsets.UTF_8));
}
if (uri.getQuery() != null) {
os.write(String.format("?%s", uri.getQuery()).getBytes(StandardCharsets.UTF_8));
}
if (uri.getFragment() != null) {
os.write(String.format("#%s", uri.getFragment()).getBytes(StandardCharsets.UTF_8));
}
if (os.size() > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException(
"Key would be too large (length=" + os.size() + ", limit=" + HConstants.MAX_ROW_LENGTH);
}
return os.toByteArray();
}
static final Pattern URL_PATTERN = Pattern.compile(
"\\b((https?|ftp|file)://|(www|ftp)\\.)" + "[\\-A-Z0-9+&@#/%?=~_|$!:,\\.;]*[A-Z0-9+&@#/%=~_|$]",
Pattern.CASE_INSENSITIVE);
private static Collection<String> extractUrls(byte[] content) {
final Set<String> list = new HashSet<>(); // uniques
final Matcher m = URL_PATTERN.matcher(new String(content, StandardCharsets.UTF_8));
while (m.find()) {
list.add(m.group());
}
return list;
}
}