HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl (#3731)

- Use BufferedMutator instead of Table.
- Improve row key generator.
- Improve retries and log levels.

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2021-10-08 10:00:51 -07:00
parent 5af02201d2
commit c3c7d36578
1 changed files with 94 additions and 68 deletions

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -156,6 +157,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r"); protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
private static final int VERIFICATION_READ_RETRIES = 10;
public static enum Counts { public static enum Counts {
REFERENCED, UNREFERENCED, CORRUPT REFERENCED, UNREFERENCED, CORRUPT
} }
@ -516,8 +519,6 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
job.setOutputValueClass(BytesWritable.class); job.setOutputValueClass(BytesWritable.class);
TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job);
LOG.info("Submitting job." +
" This will take time proportional to the number of input files, please be patient.");
boolean success = job.waitForCompletion(true); boolean success = job.waitForCompletion(true);
if (!success) { if (!success) {
LOG.error("Failure during job " + job.getJobID()); LOG.error("Failure during job " + job.getJobID());
@ -551,18 +552,20 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
Configuration conf; Configuration conf;
Connection conn; Connection conn;
Table table; BufferedMutator mutator;
@Override @Override
protected void setup(Context context) throws IOException, InterruptedException { protected void setup(Context context) throws IOException, InterruptedException {
conn = ConnectionFactory.createConnection(context.getConfiguration()); conf = context.getConfiguration();
table = conn.getTable(getTablename(conn.getConfiguration())); conn = ConnectionFactory.createConnection(conf);
mutator = conn.getBufferedMutator(getTablename(conf));
mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10
} }
@Override @Override
protected void cleanup(Context context) throws IOException, InterruptedException { protected void cleanup(Context context) throws IOException, InterruptedException {
try { try {
table.close(); mutator.close();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Exception closing Table", e); LOG.warn("Exception closing Table", e);
} }
@ -582,7 +585,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (warcHeader.getRecordType().equals("response") && targetURI != null) { if (warcHeader.getRecordType().equals("response") && targetURI != null) {
String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
if (contentType != null) { if (contentType != null) {
LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\""); LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
// Make row key // Make row key
@ -623,7 +626,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (ipAddr != null) { if (ipAddr != null) {
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr)); put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
} }
table.put(put); mutator.mutate(put);
// Write records out for later verification, one per HBase field except for the // Write records out for later verification, one per HBase field except for the
// content record, which will be verified by CRC64. // content record, which will be verified by CRC64.
@ -651,10 +654,11 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
private byte[] rowKeyFromTargetURI(String targetUri) private byte[] rowKeyFromTargetURI(String targetUri)
throws URISyntaxException, IllegalArgumentException { throws URISyntaxException, IllegalArgumentException {
URI uri = new URI(targetUri); URI uri = new URI(targetUri);
StringBuffer sb = new StringBuffer();
// Ignore the scheme // Ignore the scheme
// Reverse the components of the hostname // Reverse the components of the hostname
String reversedHost;
if (uri.getHost() != null) { if (uri.getHost() != null) {
StringBuffer sb = new StringBuffer();
String[] hostComponents = uri.getHost().split("\\."); String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) { for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]); sb.append(hostComponents[i]);
@ -662,28 +666,31 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
sb.append('.'); sb.append('.');
} }
} }
reversedHost = sb.toString();
} else { } else {
throw new IllegalArgumentException("URI is missing host component"); throw new IllegalArgumentException("URI is missing host component");
} }
// Port StringBuffer sb = new StringBuffer();
if (uri.getPort() != -1) { sb.append(reversedHost);
if (uri.getPort() >= 0) {
sb.append(':'); sb.append(':');
sb.append(uri.getPort()); sb.append(uri.getPort());
} }
if (uri.getRawPath() != null) { if (uri.getPath() != null) {
sb.append(uri.getRawPath()); sb.append('/');
sb.append(uri.getPath());
} }
if (uri.getRawQuery() != null) { if (uri.getQuery() != null) {
sb.append('?'); sb.append('?');
sb.append(uri.getRawQuery()); sb.append(uri.getQuery());
} }
if (uri.getRawFragment() != null) { if (uri.getFragment() != null) {
sb.append('#'); sb.append('#');
sb.append(uri.getRawFragment()); sb.append(uri.getFragment());
} }
// Constrain the key size to the maximum allowed row key length if (sb.length() > HConstants.MAX_ROW_LENGTH) {
if (sb.length() > HConstants.MAX_ROW_LENGTH) { throw new IllegalArgumentException("Key would be too large (length=" + sb.length() +
sb.setLength(HConstants.MAX_ROW_LENGTH); ", limit=" + HConstants.MAX_ROW_LENGTH);
} }
return Bytes.toBytes(sb.toString()); return Bytes.toBytes(sb.toString());
} }
@ -767,73 +774,92 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
key.getQualifierLength()); key.getQualifierLength());
long ts = key.getTimestamp(); long ts = key.getTimestamp();
int retries = VERIFICATION_READ_RETRIES;
if (Bytes.equals(INFO_FAMILY_NAME, family) && while (true) {
Bytes.equals(CRC_QUALIFIER, qualifier)) {
long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); if (Bytes.equals(INFO_FAMILY_NAME, family) &&
Bytes.equals(CRC_QUALIFIER, qualifier)) {
Result result = long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
table.get(new Get(row) Result result = table.get(new Get(row)
.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER) .addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER) .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
.setTimestamp(ts)); .setTimestamp(ts));
byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); if (content == null) {
if (content == null) { if (retries-- > 0) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content"); continue;
output.getCounter(Counts.UNREFERENCED).increment(1); }
return; LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
} else { output.getCounter(Counts.UNREFERENCED).increment(1);
CRC64 crc = new CRC64(); return;
crc.update(content); } else {
if (crc.getValue() != expectedCRC64) { CRC64 crc = new CRC64();
LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content"); crc.update(content);
if (crc.getValue() != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
}
byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crc) != expectedCRC64) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1); output.getCounter(Counts.CORRUPT).increment(1);
return; return;
} }
}
byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
if (crc == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c");
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (Bytes.toLong(crc) != expectedCRC64) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
} else { } else {
Result result = Result result = table.get(new Get(row)
table.get(new Get(row)
.addColumn(family, qualifier) .addColumn(family, qualifier)
.setTimestamp(ts)); .setTimestamp(ts));
byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
if (retries-- > 0) {
continue;
}
LOG.error("Row " + Bytes.toStringBinary(row) + ": " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
" mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
byte[] bytes = result.getValue(family, qualifier);
if (bytes == null) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
output.getCounter(Counts.UNREFERENCED).increment(1);
return;
}
if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
LOG.info("Row " + Bytes.toStringBinary(row) + ": " +
Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
" mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
} }
// If we fell through to here all verification checks have succeeded, potentially after
// retries, and we must exit the while loop.
output.getCounter(Counts.REFERENCED).increment(1);
break;
} }
output.getCounter(Counts.REFERENCED).increment(1);
} }
} }
} }
} }