From ef1641d2772ca751b59f797b1c85b6ae5b78ac56 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 13 Jul 2022 09:01:21 -0700 Subject: [PATCH] HBASE-27088 IntegrationLoadTestCommonCrawl async load improvements (#4488) * HBASE-27088 IntegrationLoadTestCommonCrawl async load improvements - Use an async client and work stealing executor for parallelism during loads. - Remove the verification read retries, these are not that effective during replication lag anyway. - Increase max task attempts because S3 might throttle. - Implement a side task that exercises Increments by extracting urls from content and updating a cf that tracks referrer counts. These are not validated at this time. It could be possible to log the increments, sum them with a reducer, and then verify the total, but this is left as a future exercise. Signed-off-by: Viraj Jasani * Sum RPC time for writes (loader) and reads (verifier) and mutation bytes submitted. Expose as job counters. * Fix an issue with completion chaining * Pause loading if too many operations are in flight --- .../test/IntegrationTestLoadCommonCrawl.java | 471 +++++++++++------- 1 file changed, 298 insertions(+), 173 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index d57cd8198e6..bdb1c719af2 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.test; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -26,10 +27,18 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -45,14 +54,17 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ScanResultConsumer; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -114,6 +126,10 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; * not specify compression (by default) there is roughly a 10x expansion. Loading the full crawl * archive results in a table approximately 640 TB in size. *

+ * The loader can optionally drive read load during ingest by incrementing counters for each URL + * discovered in content. Add -DIntegrationTestLoadCommonCrawl.increments=true to the + * command line to enable. + *

* You can also split the Loader and Verify stages: *

* Load with:

./bin/hbase @@ -136,26 +152,34 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class); - protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; - protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; + static final String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; + static final String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; - protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); - protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); - protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; - protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); - protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); - protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c"); - protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d"); - protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); - protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r"); - protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); + static final String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments"; + static final boolean DEFAULT_INCREMENTS = false; - private static final int VERIFICATION_READ_RETRIES = 10; + static final int MAX_INFLIGHT = 1000; + static final int INFLIGHT_PAUSE_MS = 100; + + static final byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); + static final byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); + static final byte[] URL_FAMILY_NAME = Bytes.toBytes("u"); + static final byte[] SEP = Bytes.toBytes(":"); + static final byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; + static final byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); + static final byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); + static final byte[] CRC_QUALIFIER = Bytes.toBytes("c"); + static final byte[] DATE_QUALIFIER = Bytes.toBytes("d"); + static final byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); + static final byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); + static final byte[] REF_QUALIFIER = Bytes.toBytes("ref"); public static enum Counts { REFERENCED, UNREFERENCED, - CORRUPT + CORRUPT, + RPC_BYTES_WRITTEN, + RPC_TIME_MS, } protected Path warcFileInputDir = null; @@ -241,6 +265,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { Set families = new HashSet<>(); families.add(Bytes.toString(CONTENT_FAMILY_NAME)); families.add(Bytes.toString(INFO_FAMILY_NAME)); + families.add(Bytes.toString(URL_FAMILY_NAME)); return families; } @@ -292,10 +317,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { qualifier != null ? qualifier.length : 0, ts); } + public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier) { + this(row, family, qualifier, Long.MAX_VALUE); + } + public HBaseKeyWritable(byte[] row, byte[] family, long ts) { this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts); } + public HBaseKeyWritable(byte[] row, byte[] family) { + this(row, family, Long.MAX_VALUE); + } + public HBaseKeyWritable(Cell cell) { this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), @@ -421,17 +454,24 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { Admin admin = conn.getAdmin()) { if (!admin.tableExists(tableName)) { - ColumnFamilyDescriptorBuilder contentFamilyBuilder = ColumnFamilyDescriptorBuilder - .newBuilder(CONTENT_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE) - .setBloomFilterType(BloomType.ROW).setMaxVersions(1000).setBlocksize(256 * 1024); + ColumnFamilyDescriptorBuilder contentFamilyBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME).setMaxVersions(1000) + .setDataBlockEncoding(DataBlockEncoding.NONE).setBloomFilterType(BloomType.ROW); - ColumnFamilyDescriptorBuilder infoFamilyBuilder = ColumnFamilyDescriptorBuilder - .newBuilder(INFO_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE) - .setBloomFilterType(BloomType.ROWCOL).setMaxVersions(1000).setBlocksize(8 * 1024); + ColumnFamilyDescriptorBuilder infoFamilyBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME).setMaxVersions(1000) + .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) + .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); + + ColumnFamilyDescriptorBuilder urlFamilyBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(URL_FAMILY_NAME).setMaxVersions(1000) + .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) + .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); Set families = new HashSet<>(); families.add(contentFamilyBuilder.build()); families.add(infoFamilyBuilder.build()); + families.add(urlFamilyBuilder.build()); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName).setColumnFamilies(families).build(); @@ -507,11 +547,23 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { job.setOutputKeyClass(HBaseKeyWritable.class); job.setOutputValueClass(BytesWritable.class); TableMapReduceUtil.addDependencyJars(job); + // Increase max attempts because S3 might throttle aggressively and ultimately fail a task + job.getConfiguration().setInt("mapred.map.max.attempts", 100); + job.getConfiguration().setInt("mapreduce.map.maxattempts", 100); boolean success = job.waitForCompletion(true); if (!success) { LOG.error("Failure during job " + job.getJobID()); } + + final Counters counters = job.getCounters(); + for (Counts c : Counts.values()) { + long value = counters.findCounter(c).getValue(); + if (value != 0) { + LOG.info(c + ": " + value); + } + } + return success ? 0 : 1; } @@ -539,24 +591,39 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { public static class LoaderMapper extends Mapper { - protected Configuration conf; - protected Connection conn; - protected BufferedMutator mutator; + protected AsyncConnection conn; + protected AsyncTable table; + protected ExecutorService executor; + protected AtomicLong inflight = new AtomicLong(); + protected boolean doIncrements; @Override protected void setup(final Context context) throws IOException, InterruptedException { - conf = context.getConfiguration(); - conn = ConnectionFactory.createConnection(conf); - mutator = conn.getBufferedMutator(getTablename(conf)); + executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors()); + Configuration conf = context.getConfiguration(); + doIncrements = conf.getBoolean(INCREMENTS_NAME_KEY, DEFAULT_INCREMENTS); + try { + conn = ConnectionFactory.createAsyncConnection(conf).get(); + table = conn.getTable(getTablename(conf), executor); + } catch (ExecutionException e) { + throw new IOException(e); + } } @Override protected void cleanup(final Context context) throws IOException, InterruptedException { - try { - mutator.close(); - } catch (Exception e) { - LOG.warn("Exception closing Table", e); + + while (inflight.get() != 0) { + LOG.info("Operations in flight, waiting"); + Thread.sleep(INFLIGHT_PAUSE_MS); } + + // Shut down the executor + executor.shutdown(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + LOG.warn("Pool did not shut down cleanly"); + } + // Close the connection try { conn.close(); } catch (Exception e) { @@ -573,10 +640,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { if (warcHeader.getRecordType().equals("response") && targetURI != null) { final String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); if (contentType != null) { - LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID); - // Make row key - byte[] rowKey; try { rowKey = rowKeyFromTargetURI(targetURI); @@ -590,22 +654,21 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { } // Get the content and calculate the CRC64 - final byte[] content = value.getRecord().getContent(); final CRC64 crc = new CRC64(); crc.update(content); final long crc64 = crc.getValue(); + LOG.info("{}: content {} bytes, crc64={}", targetURI, content.length, + Bytes.toHex(Bytes.toBytes(crc64))); // Store to HBase - - final long ts = getCurrentTime(); + final long ts = getSequence(); final Put put = new Put(rowKey); put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content); put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts, Bytes.toBytes(content.length)); put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType)); put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64)); - put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID)); put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI)); put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts, Bytes.toBytes(warcHeader.getDateString())); @@ -613,20 +676,32 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { if (ipAddr != null) { put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); } - mutator.mutate(put); + long pending = inflight.incrementAndGet(); + while (pending > MAX_INFLIGHT) { + LOG.info("Too many operations in flight, waiting"); + Thread.sleep(INFLIGHT_PAUSE_MS); + pending = inflight.get(); + } + final long putStartTime = System.currentTimeMillis(); + final CompletableFuture putFuture = table.put(put); + putFuture.thenRun(() -> { + inflight.decrementAndGet(); + if (!putFuture.isCompletedExceptionally()) { + output.getCounter(Counts.RPC_TIME_MS) + .increment(System.currentTimeMillis() - putStartTime); + output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); + } + }); // Write records out for later verification, one per HBase field except for the // content record, which will be verified by CRC64. - - output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), - new BytesWritable(Bytes.toBytes(crc64))); output.write( new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(content.length))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(contentType))); - output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts), - new BytesWritable(Bytes.toBytes(recordID))); + output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), + new BytesWritable(Bytes.toBytes(crc64))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(targetURI))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts), @@ -635,54 +710,44 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(ipAddr))); } - } - } - } - private byte[] rowKeyFromTargetURI(final String targetUri) - throws URISyntaxException, IllegalArgumentException { - final URI uri = new URI(targetUri); - // Ignore the scheme - // Reverse the components of the hostname - String reversedHost; - if (uri.getHost() != null) { - final StringBuilder sb = new StringBuilder(); - final String[] hostComponents = uri.getHost().split("\\."); - for (int i = hostComponents.length - 1; i >= 0; i--) { - sb.append(hostComponents[i]); - if (i != 0) { - sb.append('.'); + if (doIncrements) { + // The URLs cf is not tracked for correctness. For now it is used only to exercise + // Increments, to drive some read load during ingest. They can be verified with a + // reducer to sum increments per row and then compare the final count to the table + // data. This is left as a future exercise. + final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey); + for (String refUri : extractUrls(content)) { + try { + byte[] urlRowKey = rowKeyFromTargetURI(refUri); + LOG.debug(" -> {}", refUri); + final Increment increment = new Increment(urlRowKey); + increment.setTimestamp(ts); + increment.addColumn(URL_FAMILY_NAME, refQual, 1); + pending = inflight.incrementAndGet(); + while (pending > MAX_INFLIGHT) { + LOG.info("Too many operations in flight, waiting"); + Thread.sleep(INFLIGHT_PAUSE_MS); + pending = inflight.get(); + } + final long incrStartTime = System.currentTimeMillis(); + final CompletableFuture incrFuture = table.increment(increment); + incrFuture.thenRun(() -> { + inflight.decrementAndGet(); + if (!incrFuture.isCompletedExceptionally()) { + output.getCounter(Counts.RPC_TIME_MS) + .increment(System.currentTimeMillis() - incrStartTime); + output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); + } + }); + } catch (IllegalArgumentException | URISyntaxException e) { + LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); + } + } } } - reversedHost = sb.toString(); - } else { - throw new IllegalArgumentException("URI is missing host component"); } - final StringBuilder sb = new StringBuilder(); - sb.append(reversedHost); - if (uri.getPort() >= 0) { - sb.append(':'); - sb.append(uri.getPort()); - } - if (uri.getPath() != null) { - sb.append('/'); - sb.append(uri.getPath()); - } - if (uri.getQuery() != null) { - sb.append('?'); - sb.append(uri.getQuery()); - } - if (uri.getFragment() != null) { - sb.append('#'); - sb.append(uri.getFragment()); - } - if (sb.length() > HConstants.MAX_ROW_LENGTH) { - throw new IllegalArgumentException("Key would be too large (length=" + sb.length() - + ", limit=" + HConstants.MAX_ROW_LENGTH); - } - return Bytes.toBytes(sb.toString()); } - } } @@ -709,13 +774,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); TableMapReduceUtil.addDependencyJars(job); + boolean success = job.waitForCompletion(true); if (!success) { LOG.error("Failure during job " + job.getJobID()); } + final Counters counters = job.getCounters(); for (Counts c : Counts.values()) { - LOG.info(c + ": " + counters.findCounter(c).getValue()); + long value = counters.findCounter(c).getValue(); + if (value != 0) { + LOG.info(c + ": " + value); + } } if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) { LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID()); @@ -725,6 +795,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { LOG.error("Nonzero CORRUPT count from job " + job.getJobID()); success = false; } + return success ? 0 : 1; } @@ -745,22 +816,25 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { public static class VerifyMapper extends Mapper { - private Connection conn; - private Table table; + protected Connection conn; + protected Table table; @Override protected void setup(final Context context) throws IOException, InterruptedException { - conn = ConnectionFactory.createConnection(context.getConfiguration()); - table = conn.getTable(getTablename(conn.getConfiguration())); + Configuration conf = context.getConfiguration(); + conn = ConnectionFactory.createConnection(conf); + table = conn.getTable(getTablename(conf)); } @Override protected void cleanup(final Context context) throws IOException, InterruptedException { + // Close the table try { table.close(); } catch (Exception e) { - LOG.warn("Exception closing Table", e); + LOG.warn("Exception closing table", e); } + // Close the connection try { conn.close(); } catch (Exception e) { @@ -778,95 +852,146 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength()); final long ts = key.getTimestamp(); - int retries = VERIFICATION_READ_RETRIES; - while (true) { - - if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { - - final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); - final Result result = - table.get(new Get(row).addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER) - .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER).setTimestamp(ts)); - final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); - if (content == null) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } else { - final CRC64 crc = new CRC64(); - crc.update(content); - if (crc.getValue() != expectedCRC64) { - LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); - output.getCounter(Counts.CORRUPT).increment(1); - return; - } - } - final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); - if (crc == null) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } - if (Bytes.toLong(crc) != expectedCRC64) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); - output.getCounter(Counts.CORRUPT).increment(1); - return; - } - - } else { - - final Result result = - table.get(new Get(row).addColumn(family, qualifier).setTimestamp(ts)); - final byte[] bytes = result.getValue(family, qualifier); - if (bytes == null) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " - + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } - if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) - + ":" + Bytes.toStringBinary(qualifier) + " mismatch"); - output.getCounter(Counts.CORRUPT).increment(1); - return; - } - + if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { + final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); + final Get get = new Get(row).setTimestamp(ts).addFamily(CONTENT_FAMILY_NAME) + .addFamily(INFO_FAMILY_NAME); + final long startTime = System.currentTimeMillis(); + Result r; + try { + r = table.get(get); + output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); + } catch (Exception e) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; } - - // If we fell through to here all verification checks have succeeded, potentially after - // retries, and we must exit the while loop. + final byte[] crcBytes = r.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); + if (crcBytes == null) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + if (Bytes.toLong(crcBytes) != expectedCRC64) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); + output.getCounter(Counts.CORRUPT).increment(1); + return; + } + // If we fell through to here all verification checks have succeeded for the info + // record. + output.getCounter(Counts.REFERENCED).increment(1); + final byte[] content = r.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); + if (content == null) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } else { + final CRC64 crc = new CRC64(); + crc.update(content); + if (crc.getValue() != expectedCRC64) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); + output.getCounter(Counts.CORRUPT).increment(1); + return; + } + } + // If we fell through to here all verification checks have succeeded for the content + // record. + output.getCounter(Counts.REFERENCED).increment(1); + } else { + final long startTime = System.currentTimeMillis(); + final Get get = new Get(row).setTimestamp(ts).addColumn(family, qualifier); + Result r; + try { + r = table.get(get); + output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); + } catch (Exception e) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + final byte[] bytes = r.getValue(family, qualifier); + if (bytes == null) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " + + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) + ":" + + Bytes.toStringBinary(qualifier) + " mismatch"); + output.getCounter(Counts.CORRUPT).increment(1); + return; + } + // If we fell through to here all verification checks have succeeded for the info + // record. output.getCounter(Counts.REFERENCED).increment(1); - break; - } } } } private static final AtomicLong counter = new AtomicLong(); + private static final int shift = 8; - private static long getCurrentTime() { - // Typical hybrid logical clock scheme. - // Take the current time, shift by 16 bits and zero those bits, and replace those bits - // with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps - // cannot be negative. - return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L) - | (counter.getAndIncrement() & 0xffffL); + private static long getSequence() { + long t = EnvironmentEdgeManager.currentTime(); + t <<= shift; + t |= (counter.getAndIncrement() % (1 << shift)); + return t; + } + + private static byte[] rowKeyFromTargetURI(final String targetUri) + throws IOException, URISyntaxException, IllegalArgumentException { + final URI uri = new URI(targetUri); + // Ignore the scheme + // Reverse the components of the hostname + String reversedHost; + if (uri.getHost() != null) { + final StringBuilder sb = new StringBuilder(); + final String[] hostComponents = uri.getHost().split("\\."); + for (int i = hostComponents.length - 1; i >= 0; i--) { + sb.append(hostComponents[i]); + if (i != 0) { + sb.append('.'); + } + } + reversedHost = sb.toString(); + } else { + throw new IllegalArgumentException("URI is missing host component"); + } + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + os.write(reversedHost.getBytes(StandardCharsets.UTF_8)); + if (uri.getPort() >= 0) { + os.write(String.format(":%d", uri.getPort()).getBytes(StandardCharsets.UTF_8)); + } + os.write((byte) '|'); + if (uri.getPath() != null) { + os.write(uri.getPath().getBytes(StandardCharsets.UTF_8)); + } + if (uri.getQuery() != null) { + os.write(String.format("?%s", uri.getQuery()).getBytes(StandardCharsets.UTF_8)); + } + if (uri.getFragment() != null) { + os.write(String.format("#%s", uri.getFragment()).getBytes(StandardCharsets.UTF_8)); + } + if (os.size() > HConstants.MAX_ROW_LENGTH) { + throw new IllegalArgumentException( + "Key would be too large (length=" + os.size() + ", limit=" + HConstants.MAX_ROW_LENGTH); + } + return os.toByteArray(); + } + + static final Pattern URL_PATTERN = Pattern.compile( + "\\b((https?|ftp|file)://|(www|ftp)\\.)" + "[\\-A-Z0-9+&@#/%?=~_|$!:,\\.;]*[A-Z0-9+&@#/%=~_|$]", + Pattern.CASE_INSENSITIVE); + + private static Collection extractUrls(byte[] content) { + final Set list = new HashSet<>(); // uniques + final Matcher m = URL_PATTERN.matcher(new String(content, StandardCharsets.UTF_8)); + while (m.find()) { + list.add(m.group()); + } + return list; } }