diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index 5dcd84e2462..40bad99dd14 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; @@ -156,6 +157,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r"); protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); + private static final int VERIFICATION_READ_RETRIES = 10; + public static enum Counts { REFERENCED, UNREFERENCED, CORRUPT } @@ -516,8 +519,6 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { job.setOutputValueClass(BytesWritable.class); TableMapReduceUtil.addDependencyJars(job); - LOG.info("Submitting job." + - " This will take time proportional to the number of input files, please be patient."); boolean success = job.waitForCompletion(true); if (!success) { LOG.error("Failure during job " + job.getJobID()); @@ -551,18 +552,20 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { Configuration conf; Connection conn; - Table table; + BufferedMutator mutator; @Override protected void setup(Context context) throws IOException, InterruptedException { - conn = ConnectionFactory.createConnection(context.getConfiguration()); - table = conn.getTable(getTablename(conn.getConfiguration())); + conf = context.getConfiguration(); + conn = ConnectionFactory.createConnection(conf); + mutator = conn.getBufferedMutator(getTablename(conf)); + mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10 } @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { - table.close(); + mutator.close(); } catch (Exception e) { LOG.warn("Exception closing Table", e); } @@ -582,7 +585,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { if (warcHeader.getRecordType().equals("response") && targetURI != null) { String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); if (contentType != null) { - LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\""); + LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID); long now = EnvironmentEdgeManager.currentTime(); // Make row key @@ -623,7 +626,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { if (ipAddr != null) { put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr)); } - table.put(put); + mutator.mutate(put); // Write records out for later verification, one per HBase field except for the // content record, which will be verified by CRC64. @@ -651,10 +654,11 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { private byte[] rowKeyFromTargetURI(String targetUri) throws URISyntaxException, IllegalArgumentException { URI uri = new URI(targetUri); - StringBuffer sb = new StringBuffer(); // Ignore the scheme // Reverse the components of the hostname + String reversedHost; if (uri.getHost() != null) { + StringBuffer sb = new StringBuffer(); String[] hostComponents = uri.getHost().split("\\."); for (int i = hostComponents.length - 1; i >= 0; i--) { sb.append(hostComponents[i]); @@ -662,28 +666,31 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { sb.append('.'); } } + reversedHost = sb.toString(); } else { throw new IllegalArgumentException("URI is missing host component"); } - // Port - if (uri.getPort() != -1) { + StringBuffer sb = new StringBuffer(); + sb.append(reversedHost); + if (uri.getPort() >= 0) { sb.append(':'); sb.append(uri.getPort()); } - if (uri.getRawPath() != null) { - sb.append(uri.getRawPath()); + if (uri.getPath() != null) { + sb.append('/'); + sb.append(uri.getPath()); } - if (uri.getRawQuery() != null) { + if (uri.getQuery() != null) { sb.append('?'); - sb.append(uri.getRawQuery()); + sb.append(uri.getQuery()); } - if (uri.getRawFragment() != null) { + if (uri.getFragment() != null) { sb.append('#'); - sb.append(uri.getRawFragment()); + sb.append(uri.getFragment()); } - // Constrain the key size to the maximum allowed row key length - if (sb.length() > HConstants.MAX_ROW_LENGTH) { - sb.setLength(HConstants.MAX_ROW_LENGTH); + if (sb.length() > HConstants.MAX_ROW_LENGTH) { + throw new IllegalArgumentException("Key would be too large (length=" + sb.length() + + ", limit=" + HConstants.MAX_ROW_LENGTH); } return Bytes.toBytes(sb.toString()); } @@ -767,73 +774,92 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength()); long ts = key.getTimestamp(); + int retries = VERIFICATION_READ_RETRIES; - if (Bytes.equals(INFO_FAMILY_NAME, family) && - Bytes.equals(CRC_QUALIFIER, qualifier)) { + while (true) { - long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); + if (Bytes.equals(INFO_FAMILY_NAME, family) && + Bytes.equals(CRC_QUALIFIER, qualifier)) { - Result result = - table.get(new Get(row) + long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); + Result result = table.get(new Get(row) .addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER) .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER) .setTimestamp(ts)); - - byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); - if (content == null) { - LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content"); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } else { - CRC64 crc = new CRC64(); - crc.update(content); - if (crc.getValue() != expectedCRC64) { - LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content"); + byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); + if (content == null) { + if (retries-- > 0) { + continue; + } + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } else { + CRC64 crc = new CRC64(); + crc.update(content); + if (crc.getValue() != expectedCRC64) { + if (retries-- > 0) { + continue; + } + LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); + output.getCounter(Counts.CORRUPT).increment(1); + return; + } + } + byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); + if (crc == null) { + if (retries-- > 0) { + continue; + } + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + if (Bytes.toLong(crc) != expectedCRC64) { + if (retries-- > 0) { + continue; + } + LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); output.getCounter(Counts.CORRUPT).increment(1); return; } - } - byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); - if (crc == null) { - LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c"); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } - if (Bytes.toLong(crc) != expectedCRC64) { - LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); - output.getCounter(Counts.CORRUPT).increment(1); - return; - } - } else { + } else { - Result result = - table.get(new Get(row) + Result result = table.get(new Get(row) .addColumn(family, qualifier) .setTimestamp(ts)); + byte[] bytes = result.getValue(family, qualifier); + if (bytes == null) { + if (retries-- > 0) { + continue; + } + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " + + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { + if (retries-- > 0) { + continue; + } + LOG.error("Row " + Bytes.toStringBinary(row) + ": " + + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) + + " mismatch"); + output.getCounter(Counts.CORRUPT).increment(1); + return; + } - byte[] bytes = result.getValue(family, qualifier); - if (bytes == null) { - LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " + - Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } - if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { - LOG.info("Row " + Bytes.toStringBinary(row) + ": " + - Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) + - " mismatch"); - output.getCounter(Counts.CORRUPT).increment(1); - return; } + // If we fell through to here all verification checks have succeeded, potentially after + // retries, and we must exit the while loop. + output.getCounter(Counts.REFERENCED).increment(1); + break; + } - - output.getCounter(Counts.REFERENCED).increment(1); } - } - } }