HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl (#3731)

HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl

- Use BufferedMutator instead of Table.
- Improve row key generator.
- Improve retries and log levels.

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2021-10-08 10:00:51 -07:00 committed by GitHub
parent 39a20c528e
commit a384c239b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 94 additions and 68 deletions

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
@ -156,6 +157,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
private static final int VERIFICATION_READ_RETRIES = 10;
public static enum Counts {
REFERENCED, UNREFERENCED, CORRUPT
}
@ -516,8 +519,6 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
job.setOutputValueClass(BytesWritable.class);
TableMapReduceUtil.addDependencyJars(job);
LOG.info("Submitting job." +
" This will take time proportional to the number of input files, please be patient.");
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Failure during job " + job.getJobID());
@ -551,18 +552,20 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
Configuration conf;
Connection conn;
Table table;
BufferedMutator mutator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
conn = ConnectionFactory.createConnection(context.getConfiguration());
table = conn.getTable(getTablename(conn.getConfiguration()));
conf = context.getConfiguration();
conn = ConnectionFactory.createConnection(conf);
mutator = conn.getBufferedMutator(getTablename(conf));
mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
table.close();
mutator.close();
} catch (Exception e) {
LOG.warn("Exception closing Table", e);
}
@ -582,7 +585,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (warcHeader.getRecordType().equals("response") && targetURI != null) {
String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
if (contentType != null) {
LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\"");
LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
long now = EnvironmentEdgeManager.currentTime();
// Make row key
@ -623,7 +626,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (ipAddr != null) {
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
}
table.put(put);
mutator.mutate(put);
// Write records out for later verification, one per HBase field except for the
// content record, which will be verified by CRC64.
@ -651,10 +654,11 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
private byte[] rowKeyFromTargetURI(String targetUri)
throws URISyntaxException, IllegalArgumentException {
URI uri = new URI(targetUri);
StringBuffer sb = new StringBuffer();
// Ignore the scheme
// Reverse the components of the hostname
String reversedHost;
if (uri.getHost() != null) {
StringBuffer sb = new StringBuffer();
String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]);
@ -662,28 +666,31 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
sb.append('.');
}
}
reversedHost = sb.toString();
} else {
throw new IllegalArgumentException("URI is missing host component");
}
// Port
if (uri.getPort() != -1) {
StringBuffer sb = new StringBuffer();
sb.append(reversedHost);
if (uri.getPort() >= 0) {
sb.append(':');
sb.append(uri.getPort());
}
if (uri.getRawPath() != null) {
sb.append(uri.getRawPath());
if (uri.getPath() != null) {
sb.append('/');
sb.append(uri.getPath());
}
if (uri.getRawQuery() != null) {
if (uri.getQuery() != null) {
sb.append('?');
sb.append(uri.getRawQuery());
sb.append(uri.getQuery());
}
if (uri.getRawFragment() != null) {
if (uri.getFragment() != null) {
sb.append('#');
sb.append(uri.getRawFragment());
sb.append(uri.getFragment());
}
// Constrain the key size to the maximum allowed row key length
if (sb.length() > HConstants.MAX_ROW_LENGTH) {
sb.setLength(HConstants.MAX_ROW_LENGTH);
if (sb.length() > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException("Key would be too large (length=" + sb.length() +
", limit=" + HConstants.MAX_ROW_LENGTH);
}
return Bytes.toBytes(sb.toString());
}
@ -767,73 +774,92 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
key.getQualifierLength());
long ts = key.getTimestamp();
int retries = VERIFICATION_READ_RETRIES;
if (Bytes.equals(INFO_FAMILY_NAME, family) &&
Bytes.equals(CRC_QUALIFIER, qualifier)) {
while (true) {
long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
if (Bytes.equals(INFO_FAMILY_NAME, family) &&
Bytes.equals(CRC_QUALIFIER, qualifier)) {
Result result =
table.get(new Get(row)
long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
Result result = table.get(new Get(row)
.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
.setTimestamp(ts));
byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
if (content == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
} else {
CRC64 crc = new CRC64();
crc.update(content);
if (crc.getValue() != expectedCRC64) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content");
byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
if (content == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
} else {
CRC64 crc = new CRC64();
crc.update(content);
if (crc.getValue() != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crc) != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crc) != expectedCRC64) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
} else {
} else {
Result result =
table.get(new Get(row)
Result result = table.get(new Get(row)
.addColumn(family, qualifier)
.setTimestamp(ts));
byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
" mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
" mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
// If we fell through to here all verification checks have succeeded, potentially after
// retries, and we must exit the while loop.
output.getCounter(Counts.REFERENCED).increment(1);
break;
}
output.getCounter(Counts.REFERENCED).increment(1);
}
}
}
}