diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
new file mode 100644
index 00000000000..64a9540dda3
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
@@ -0,0 +1,838 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.test;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IntegrationTestBase;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.test.util.CRC64;
+import org.apache.hadoop.hbase.test.util.warc.WARCInputFormat;
+import org.apache.hadoop.hbase.test.util.warc.WARCRecord;
+import org.apache.hadoop.hbase.test.util.warc.WARCWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+
+/**
+ * This integration test loads successful resource retrieval records from the Common Crawl
+ * (https://commoncrawl.org/) public dataset into an HBase table and writes records that can be
+ * used to later verify the presence and integrity of those records.
+ *
+ * Run like:
+ *
+ * ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl \
+ * -Dfs.s3n.awsAccessKeyId=<AWS access key> \
+ * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \
+ * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \
+ * /path/to/tmp/warc-loader-output
+ *
+ *
+ * Access to the Common Crawl dataset in S3 is made available to anyone by Amazon AWS, but
+ * Hadoop's S3N filesystem still requires valid access credentials to initialize.
+ *
+ * The input path can either specify a directory or a file. The file may optionally be
+ * compressed with gzip. If a directory, the loader expects the directory to contain one or more
+ * WARC files from the Common Crawl dataset. If a file, the loader expects a list of Hadoop S3N
+ * URIs which point to S3 locations for one or more WARC files from the Common Crawl dataset,
+ * one URI per line. Lines should be terminated with the UNIX line terminator.
+ *
+ * Included in hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz is a list of all WARC
+ * files comprising the Q1 2021 crawl archive. There are 64,000 WARC files in this data set, each
+ * containing ~1GB of gzipped data. The WARC files contain several record types, such as metadata,
+ * request, and response, but we only load the response record types. If the HBase table schema
+ * does not specify compression (by default) there is roughly a 10x expansion. Loading the full
+ * crawl archive results in a table approximately 640 TB in size.
+ *
+ * You can also split the Loader and Verify stages:
+ *
+ * Load with:
+ *
+ * ./bin/hbase 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Loader' \
+ * -files /path/to/hadoop-aws.jar \
+ * -Dfs.s3n.awsAccessKeyId=<AWS access key> \
+ * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \
+ * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \
+ * /path/to/tmp/warc-loader-output
+ *
+ *
+ * Note: The hadoop-aws jar will be needed at runtime to instantiate the S3N filesystem. Use
+ * the -files ToolRunner argument to add it.
+ *
+ * Verify with:
+ *
+ * ./bin/hbase 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Verify' \
+ * /path/to/tmp/warc-loader-output
+ *
+ *
+ */
+public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class);
+
+ protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
+ protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
+
+ protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
+ protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
+ protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
+ protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
+ protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
+ protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c");
+ protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d");
+ protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
+ protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
+ protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
+
+ public static enum Counts {
+ REFERENCED, UNREFERENCED, CORRUPT
+ }
+
+ Path warcFileInputDir = null;
+ Path outputDir = null;
+ String[] args;
+
+ protected int runLoader(Path warcFileInputDir, Path outputDir) throws Exception {
+ Loader loader = new Loader();
+ loader.setConf(conf);
+ return loader.run(warcFileInputDir, outputDir);
+ }
+
+ protected int runVerify(Path inputDir) throws Exception {
+ Verify verify = new Verify();
+ verify.setConf(conf);
+ return verify.run(inputDir);
+ }
+
+ @Override
+ public int run(String[] args) {
+ if (args.length > 0) {
+ warcFileInputDir = new Path(args[0]);
+ if (args.length > 1) {
+ outputDir = new Path(args[1]);
+ }
+ }
+ try {
+ if (warcFileInputDir == null) {
+ throw new IllegalArgumentException("WARC input file or directory not specified");
+ }
+ if (outputDir == null) {
+ throw new IllegalArgumentException("Output directory not specified");
+ }
+ int res = runLoader(warcFileInputDir, outputDir);
+ if (res != 0) {
+ LOG.error("Loader failed");
+ return -1;
+ }
+ res = runVerify(outputDir);
+ } catch (Exception e) {
+ LOG.error("Tool failed with exception", e);
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ processBaseOptions(cmd);
+ args = cmd.getArgs();
+ }
+
+ @Override
+ public void setUpCluster() throws Exception {
+ util = getTestingUtil(getConf());
+ boolean isDistributed = util.isDistributedCluster();
+ util.initializeCluster(isDistributed ? 1 : 3);
+ if (!isDistributed) {
+ util.startMiniMapReduceCluster();
+ }
+ this.setConf(util.getConfiguration());
+ }
+
+ @Override
+ public void cleanUpCluster() throws Exception {
+ super.cleanUpCluster();
+ if (util.isDistributedCluster()) {
+ util.shutdownMiniMapReduceCluster();
+ }
+ }
+
+ static TableName getTablename(Configuration c) {
+ return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
+ }
+
+ @Override
+ public TableName getTablename() {
+ return getTablename(getConf());
+ }
+
+ @Override
+ protected Set getColumnFamilies() {
+ Set families = new HashSet<>();
+ families.add(Bytes.toString(CONTENT_FAMILY_NAME));
+ families.add(Bytes.toString(INFO_FAMILY_NAME));
+ return families;
+ }
+
+ @Override
+ public int runTestFromCommandLine() throws Exception {
+ return ToolRunner.run(getConf(), this, args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int ret = ToolRunner.run(conf, new IntegrationTestLoadCommonCrawl(), args);
+ System.exit(ret);
+ }
+
+ public static class HBaseKeyWritable implements Writable {
+
+ private byte[] row;
+ private int rowOffset;
+ private int rowLength;
+ private byte[] family;
+ private int familyOffset;
+ private int familyLength;
+ private byte[] qualifier;
+ private int qualifierOffset;
+ private int qualifierLength;
+ private long ts;
+
+ public HBaseKeyWritable() { }
+
+ public HBaseKeyWritable(byte[] row, int rowOffset, int rowLength,
+ byte[] family, int familyOffset, int familyLength,
+ byte[] qualifier, int qualifierOffset, int qualifierLength, long ts) {
+ this.row = row;
+ this.rowOffset = rowOffset;
+ this.rowLength = rowLength;
+ this.family = family;
+ this.familyOffset = familyOffset;
+ this.familyLength = familyLength;
+ this.qualifier = qualifier;
+ this.qualifierOffset = qualifierOffset;
+ this.qualifierLength = qualifierLength;
+ this.ts = ts;
+ }
+
+ public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier, long ts) {
+ this(row, 0, row.length,
+ family, 0, family.length,
+ qualifier, 0, qualifier != null ? qualifier.length : 0, ts);
+ }
+
+ public HBaseKeyWritable(byte[] row, byte[] family, long ts) {
+ this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts);
+ }
+
+ public HBaseKeyWritable(Cell cell) {
+ this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.row = Bytes.toBytes(in.readUTF());
+ this.rowOffset = 0;
+ this.rowLength = row.length;
+ this.family = Bytes.toBytes(in.readUTF());
+ this.familyOffset = 0;
+ this.familyLength = family.length;
+ this.qualifier = Bytes.toBytes(in.readUTF());
+ this.qualifierOffset = 0;
+ this.qualifierLength = qualifier.length;
+ this.ts = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(new String(row, rowOffset, rowLength, StandardCharsets.UTF_8));
+ out.writeUTF(new String(family, familyOffset, familyLength, StandardCharsets.UTF_8));
+ if (qualifier != null) {
+ out.writeUTF(new String(qualifier, qualifierOffset, qualifierLength,
+ StandardCharsets.UTF_8));
+ } else {
+ out.writeUTF("");
+ }
+ out.writeLong(ts);
+ }
+
+ public byte[] getRowArray() {
+ return row;
+ }
+
+ public void setRow(byte[] row) {
+ this.row = row;
+ }
+
+ public int getRowOffset() {
+ return rowOffset;
+ }
+
+ public void setRowOffset(int rowOffset) {
+ this.rowOffset = rowOffset;
+ }
+
+ public int getRowLength() {
+ return rowLength;
+ }
+
+ public void setRowLength(int rowLength) {
+ this.rowLength = rowLength;
+ }
+
+ public byte[] getFamilyArray() {
+ return family;
+ }
+
+ public void setFamily(byte[] family) {
+ this.family = family;
+ }
+
+ public int getFamilyOffset() {
+ return familyOffset;
+ }
+
+ public void setFamilyOffset(int familyOffset) {
+ this.familyOffset = familyOffset;
+ }
+
+ public int getFamilyLength() {
+ return familyLength;
+ }
+
+ public void setFamilyLength(int familyLength) {
+ this.familyLength = familyLength;
+ }
+
+ public byte[] getQualifierArray() {
+ return qualifier;
+ }
+
+ public void setQualifier(byte[] qualifier) {
+ this.qualifier = qualifier;
+ }
+
+ public int getQualifierOffset() {
+ return qualifierOffset;
+ }
+
+ public void setQualifierOffset(int qualifierOffset) {
+ this.qualifierOffset = qualifierOffset;
+ }
+
+ public int getQualifierLength() {
+ return qualifierLength;
+ }
+
+ public void setQualifierLength(int qualifierLength) {
+ this.qualifierLength = qualifierLength;
+ }
+
+ public long getTimestamp() {
+ return ts;
+ }
+
+ public void setTimestamp(long ts) {
+ this.ts = ts;
+ }
+ }
+
+ public static class Loader extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
+ private static final String USAGE = "Loader ";
+
+ void createSchema(TableName tableName) throws IOException {
+
+ try (Connection conn = ConnectionFactory.createConnection(getConf());
+ Admin admin = conn.getAdmin()) {
+ if (!admin.tableExists(tableName)) {
+
+ ColumnFamilyDescriptorBuilder contentFamilyBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME)
+ .setDataBlockEncoding(DataBlockEncoding.NONE)
+ .setBloomFilterType(BloomType.ROW)
+ .setMaxVersions(1000)
+ .setBlocksize(256 * 1024)
+ ;
+
+ ColumnFamilyDescriptorBuilder infoFamilyBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME)
+ .setDataBlockEncoding(DataBlockEncoding.NONE)
+ .setBloomFilterType(BloomType.ROWCOL)
+ .setMaxVersions(1000)
+ .setBlocksize(8 * 1024)
+ ;
+
+ Set families = new HashSet<>();
+ families.add(contentFamilyBuilder.build());
+ families.add(infoFamilyBuilder.build());
+
+ TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamilies(families)
+ .build();
+
+ if (getConf().getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
+ HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
+ int numberOfServers = admin.getRegionServers().size();
+ if (numberOfServers == 0) {
+ throw new IllegalStateException("No live regionservers");
+ }
+ int regionsPerServer = getConf().getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
+ HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
+ int totalNumberOfRegions = numberOfServers * regionsPerServer;
+ LOG.info("Creating test table: " + tableDescriptor);
+ LOG.info("Number of live regionservers: " + numberOfServers + ", " +
+ "pre-splitting table into " + totalNumberOfRegions + " regions " +
+ "(default regions per server: " + regionsPerServer + ")");
+ byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
+ admin.createTable(tableDescriptor, splits);
+ } else {
+ LOG.info("Creating test table: " + tableDescriptor);
+ admin.createTable(tableDescriptor);
+ }
+ }
+ } catch (MasterNotRunningException e) {
+ LOG.error("Master not running", e);
+ throw new IOException(e);
+ }
+ }
+
+ int run(Path warcFileInput, Path outputDir)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ createSchema(getTablename(getConf()));
+
+ Job job = Job.getInstance(getConf());
+ job.setJobName(Loader.class.getName());
+ job.setNumReduceTasks(0);
+ job.setJarByClass(getClass());
+ job.setMapperClass(LoaderMapper.class);
+ job.setInputFormatClass(WARCInputFormat.class);
+ FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf());
+ if (fs.getFileStatus(warcFileInput).isDirectory()) {
+ LOG.info("Using directory as WARC input path: " + warcFileInput);
+ FileInputFormat.setInputPaths(job, warcFileInput);
+ } else {
+ LOG.info("Getting WARC input paths from file: " + warcFileInput);
+ List paths = new LinkedList();
+ try (FSDataInputStream is = fs.open(warcFileInput)) {
+ InputStreamReader reader;
+ if (warcFileInput.getName().toLowerCase().endsWith(".gz")) {
+ reader = new InputStreamReader(new GZIPInputStream(is));
+ } else {
+ reader = new InputStreamReader(is);
+ }
+ try (BufferedReader br = new BufferedReader(reader)) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ paths.add(new Path(line));
+ }
+ }
+ }
+ LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput);
+ FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
+ }
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, outputDir);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ job.setOutputKeyClass(HBaseKeyWritable.class);
+ job.setOutputValueClass(BytesWritable.class);
+ TableMapReduceUtil.addDependencyJars(job);
+
+ LOG.info("Submitting job." +
+ " This will take time proportional to the number of input files, please be patient.");
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ LOG.error("Failure during job " + job.getJobID());
+ }
+ return success ? 0 : 1;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println(USAGE);
+ return 1;
+ }
+ try {
+ Path warcFileInput = new Path(args[0]);
+ Path outputDir = new Path(args[1]);
+ return run(warcFileInput, outputDir);
+ } catch (NumberFormatException e) {
+ System.err.println("Parsing loader arguments failed: " + e.getMessage());
+ System.err.println(USAGE);
+ return 1;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(), new Loader(), args));
+ }
+
+ public static class LoaderMapper
+ extends Mapper {
+
+ Configuration conf;
+ Connection conn;
+ Table table;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ conn = ConnectionFactory.createConnection(context.getConfiguration());
+ table = conn.getTable(getTablename(conn.getConfiguration()));
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ try {
+ table.close();
+ } catch (Exception e) {
+ LOG.warn("Exception closing Table", e);
+ }
+ try {
+ conn.close();
+ } catch (Exception e) {
+ LOG.warn("Exception closing Connection", e);
+ }
+ }
+
+ @Override
+ protected void map(LongWritable key, WARCWritable value, Context output)
+ throws IOException, InterruptedException {
+ WARCRecord.Header warcHeader = value.getRecord().getHeader();
+ String recordID = warcHeader.getRecordID();
+ String targetURI = warcHeader.getTargetURI();
+ if (warcHeader.getRecordType().equals("response") && targetURI != null) {
+ String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
+ if (contentType != null) {
+ LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\"");
+ long now = System.currentTimeMillis();
+
+ // Make row key
+
+ byte[] rowKey;
+ try {
+ rowKey = rowKeyFromTargetURI(targetURI);
+ } catch (IllegalArgumentException e) {
+ LOG.debug("Could not make a row key for record " + recordID + ", ignoring", e);
+ return;
+ } catch (URISyntaxException e) {
+ LOG.warn("Could not parse URI \"" + targetURI + "\" for record " + recordID +
+ ", ignoring");
+ return;
+ }
+
+ // Get the content and calculate the CRC64
+
+ byte[] content = value.getRecord().getContent();
+ CRC64 crc = new CRC64();
+ crc.update(content);
+ long crc64 = crc.getValue();
+
+ // Store to HBase
+
+ Put put = new Put(rowKey);
+ put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, now, content);
+ put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, now,
+ Bytes.toBytes(content.length));
+ put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, now,
+ Bytes.toBytes(contentType));
+ put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, now, Bytes.toBytes(crc64));
+ put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, now, Bytes.toBytes(recordID));
+ put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, now, Bytes.toBytes(targetURI));
+ put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, now,
+ Bytes.toBytes(warcHeader.getDateString()));
+ String ipAddr = warcHeader.getField("WARC-IP-Address");
+ if (ipAddr != null) {
+ put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
+ }
+ table.put(put);
+
+ // Write records out for later verification, one per HBase field except for the
+ // content record, which will be verified by CRC64.
+
+ output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, now),
+ new BytesWritable(Bytes.toBytes(crc64)));
+ output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER,
+ now), new BytesWritable(Bytes.toBytes(content.length)));
+ output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER,
+ now), new BytesWritable(Bytes.toBytes(contentType)));
+ output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER,
+ now), new BytesWritable(Bytes.toBytes(recordID)));
+ output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER,
+ now), new BytesWritable(Bytes.toBytes(targetURI)));
+ output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, now),
+ new BytesWritable(Bytes.toBytes(warcHeader.getDateString())));
+ if (ipAddr != null) {
+ output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER,
+ now), new BytesWritable(Bytes.toBytes(ipAddr)));
+ }
+ }
+ }
+ }
+
+ private byte[] rowKeyFromTargetURI(String targetUri)
+ throws URISyntaxException, IllegalArgumentException {
+ URI uri = new URI(targetUri);
+ StringBuffer sb = new StringBuffer();
+ // Ignore the scheme
+ // Reverse the components of the hostname
+ if (uri.getHost() != null) {
+ String[] hostComponents = uri.getHost().split("\\.");
+ for (int i = hostComponents.length - 1; i >= 0; i--) {
+ sb.append(hostComponents[i]);
+ if (i != 0) {
+ sb.append('.');
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("URI is missing host component");
+ }
+ // Port
+ if (uri.getPort() != -1) {
+ sb.append(':');
+ sb.append(uri.getPort());
+ }
+ if (uri.getRawPath() != null) {
+ sb.append(uri.getRawPath());
+ }
+ if (uri.getRawQuery() != null) {
+ sb.append('?');
+ sb.append(uri.getRawQuery());
+ }
+ if (uri.getRawFragment() != null) {
+ sb.append('#');
+ sb.append(uri.getRawFragment());
+ }
+ // Constrain the key size to the maximum allowed row key length
+ if (sb.length() > HConstants.MAX_ROW_LENGTH) {
+ sb.setLength(HConstants.MAX_ROW_LENGTH);
+ }
+ return Bytes.toBytes(sb.toString());
+ }
+
+ }
+ }
+
+ public static class OneFilePerMapperSFIF extends SequenceFileInputFormat {
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ return false;
+ }
+ }
+
+ public static class Verify extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(Verify.class);
+ public static final String USAGE = "Verify ";
+
+ int run(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException {
+ Job job = Job.getInstance(getConf());
+ job.setJobName(Verify.class.getName());
+ job.setJarByClass(getClass());
+ job.setMapperClass(VerifyMapper.class);
+ job.setInputFormatClass(OneFilePerMapperSFIF.class);
+ FileInputFormat.setInputPaths(job, inputDir);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ TableMapReduceUtil.addDependencyJars(job);
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ LOG.error("Failure during job " + job.getJobID());
+ }
+ Counters counters = job.getCounters();
+ for (Counts c: Counts.values()) {
+ LOG.info(c + ": " + counters.findCounter(c).getValue());
+ }
+ return success ? 0 : 1;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println(USAGE);
+ return 1;
+ }
+ Path loaderOutput = new Path(args[0]);
+ return run(loaderOutput);
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(), new Verify(), args));
+ }
+
+ public static class VerifyMapper
+ extends Mapper {
+
+ Connection conn;
+ Table table;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ conn = ConnectionFactory.createConnection(context.getConfiguration());
+ table = conn.getTable(getTablename(conn.getConfiguration()));
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException ,InterruptedException {
+ table.close();
+ conn.close();
+ }
+
+ @Override
+ protected void map(HBaseKeyWritable key, BytesWritable value, Context output)
+ throws IOException, InterruptedException {
+
+ byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength());
+ byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(),
+ key.getFamilyLength());
+ byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
+ key.getQualifierLength());
+ long ts = key.getTimestamp();
+
+ if (Bytes.equals(INFO_FAMILY_NAME, family) &&
+ Bytes.equals(CRC_QUALIFIER, qualifier)) {
+
+ long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
+
+ Result result =
+ table.get(new Get(row)
+ .addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
+ .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
+ .setTimestamp(ts));
+
+ byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
+ if (content == null) {
+ LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content");
+ output.getCounter(Counts.UNREFERENCED).increment(1);
+ return;
+ } else {
+ CRC64 crc = new CRC64();
+ crc.update(content);
+ if (crc.getValue() != expectedCRC64) {
+ LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content");
+ output.getCounter(Counts.CORRUPT).increment(1);
+ return;
+ }
+ }
+ byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
+ if (crc == null) {
+ LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c");
+ output.getCounter(Counts.UNREFERENCED).increment(1);
+ return;
+ }
+ if (Bytes.toLong(crc) != expectedCRC64) {
+ LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
+ output.getCounter(Counts.CORRUPT).increment(1);
+ return;
+ }
+
+ } else {
+
+ Result result =
+ table.get(new Get(row)
+ .addColumn(family, qualifier)
+ .setTimestamp(ts));
+
+ byte[] bytes = result.getValue(family, qualifier);
+ if (bytes == null) {
+ LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " +
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
+ output.getCounter(Counts.UNREFERENCED).increment(1);
+ return;
+ }
+ if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
+ LOG.info("Row " + Bytes.toStringBinary(row) + ": " +
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
+ " mismatch");
+ output.getCounter(Counts.CORRUPT).increment(1);
+ return;
+ }
+
+ }
+
+ output.getCounter(Counts.REFERENCED).increment(1);
+ }
+
+ }
+
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/CRC64.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/CRC64.java
new file mode 100644
index 00000000000..4b4eacb5254
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/CRC64.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.test.util;
+
+// Cribbed from
+// hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CRC64.java
+
+public class CRC64 {
+ private static final long POLY = 0x9a6c9329ac4bc9b5L;
+ private static final int TABLE_LENGTH = 256;
+ private static final long[] TABLE = new long[TABLE_LENGTH];
+ static {
+ /* Initialize a table constructed from POLY */
+ for (int n = 0; n < TABLE_LENGTH; ++n) {
+ long crc = n;
+ for (int i = 0; i < 8; ++i) {
+ if ((crc & 1) == 1) {
+ crc = (crc >>> 1) ^ POLY;
+ } else {
+ crc >>>= 1;
+ }
+ }
+ TABLE[n] = crc;
+ }
+ }
+
+ private long value = -1;
+
+ public void reset() {
+ value = -1;
+ }
+
+ public void update(byte[] input, int off, int len) {
+ for (int i = off; i < off+len; i++) {
+ value = TABLE[(input[i] ^ (int) value) & 0xFF] ^ (value >>> 8);
+ }
+ }
+
+ public void update(byte[] input) {
+ update(input, 0, input.length);
+ }
+
+ public long getValue() {
+ // Return the compliment of 'value' to complete the calculation.
+ return ~value;
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileReader.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileReader.java
new file mode 100644
index 00000000000..4e7ee5ad5d9
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileReader.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads {@link WARCRecord}s from a WARC file, using Hadoop's filesystem APIs. (This means you
+ * can read from HDFS, S3 or any other filesystem supported by Hadoop). This implementation is
+ * not tied to the MapReduce APIs -- that link is provided by the mapred
+ * {@link com.martinkl.warc.mapred.WARCInputFormat} and the mapreduce
+ * {@link com.martinkl.warc.mapreduce.WARCInputFormat}.
+ */
+public class WARCFileReader {
+ private static final Logger logger = LoggerFactory.getLogger(WARCFileReader.class);
+
+ private final long fileSize;
+ private CountingInputStream byteStream = null;
+ private DataInputStream dataStream = null;
+ private long bytesRead = 0, recordsRead = 0;
+
+ /**
+ * Opens a file for reading. If the filename ends in `.gz`, it is automatically decompressed
+ * on the fly.
+ * @param conf The Hadoop configuration.
+ * @param filePath The Hadoop path to the file that should be read.
+ */
+ public WARCFileReader(Configuration conf, Path filePath) throws IOException {
+ FileSystem fs = filePath.getFileSystem(conf);
+ this.fileSize = fs.getFileStatus(filePath).getLen();
+ logger.info("Reading from " + filePath);
+
+ CompressionCodec codec = filePath.getName().endsWith(".gz") ?
+ WARCFileWriter.getGzipCodec(conf) : null;
+ byteStream = new CountingInputStream(new BufferedInputStream(fs.open(filePath)));
+ dataStream = new DataInputStream(codec == null ? byteStream :
+ codec.createInputStream(byteStream));
+ }
+
+ /**
+ * Reads the next record from the file.
+ * @return The record that was read.
+ */
+ public WARCRecord read() throws IOException {
+ WARCRecord record = new WARCRecord(dataStream);
+ recordsRead++;
+ return record;
+ }
+
+ /**
+ * Closes the file. No more reading is possible after the file has been closed.
+ */
+ public void close() throws IOException {
+ if (dataStream != null) {
+ dataStream.close();
+ }
+ byteStream = null;
+ dataStream = null;
+ }
+
+ /**
+ * Returns the number of records that have been read since the file was opened.
+ */
+ public long getRecordsRead() {
+ return recordsRead;
+ }
+
+ /**
+ * Returns the number of bytes that have been read from file since it was opened.
+ * If the file is compressed, this refers to the compressed file size.
+ */
+ public long getBytesRead() {
+ return bytesRead;
+ }
+
+ /**
+ * Returns the proportion of the file that has been read, as a number between 0.0
+ * and 1.0.
+ */
+ public float getProgress() {
+ if (fileSize == 0) {
+ return 1.0f;
+ }
+ return (float) bytesRead / (float) fileSize;
+ }
+
+ private class CountingInputStream extends FilterInputStream {
+ public CountingInputStream(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = in.read();
+ if (result != -1) {
+ bytesRead++;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int result = in.read(b, off, len);
+ if (result != -1) {
+ bytesRead += result;
+ }
+ return result;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long result = in.skip(n);
+ bytesRead += result;
+ return result;
+ }
+ }
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileWriter.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileWriter.java
new file mode 100644
index 00000000000..5f361cd81cd
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCFileWriter.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes {@link WARCRecord}s to a WARC file, using Hadoop's filesystem APIs. (This means you
+ * can write to HDFS, S3 or any other filesystem supported by Hadoop). This implementation is
+ * not tied to the MapReduce APIs -- that link is provided by the mapred
+ * {@link com.martinkl.warc.mapred.WARCOutputFormat} and the mapreduce
+ * {@link com.martinkl.warc.mapreduce.WARCOutputFormat}.
+ *
+ * WARCFileWriter keeps track of how much data it has written (optionally gzip-compressed);
+ * when the file becomes larger than some threshold, it is automatically closed and a
+ * new segment is started. A segment number is appended to the filename for that purpose.
+ * The segment number always starts at 00000, and by default a new segment is started when
+ * the file size exceeds 1GB. To change the target size for a segment, you can set the
+ * `warc.output.segment.size` key in the Hadoop configuration to the number of bytes.
+ * (Files may actually be a bit larger than this threshold, since we finish writing the
+ * current record before opening a new file.)
+ */
+public class WARCFileWriter {
+ private static final Logger logger = LoggerFactory.getLogger(WARCFileWriter.class);
+ public static final long DEFAULT_MAX_SEGMENT_SIZE = 1000000000L; // 1 GB
+
+ private final Configuration conf;
+ private final CompressionCodec codec;
+ private final Path workOutputPath;
+ private final Progressable progress;
+ private final String extensionFormat;
+ private final long maxSegmentSize;
+ private long segmentsCreated = 0, segmentsAttempted = 0, bytesWritten = 0;
+ private CountingOutputStream byteStream;
+ private DataOutputStream dataStream;
+
+ /**
+ * Creates a WARC file, and opens it for writing. If a file with the same name already
+ * exists, an attempt number in the filename is incremented until we find a file that
+ * doesn't already exist.
+ *
+ * @param conf The Hadoop configuration.
+ * @param codec If null, the file is uncompressed. If non-null, this compression codec
+ * will be used. The codec's default file extension is appended to the filename.
+ * @param workOutputPath The directory and filename prefix to which the data should be
+ * written. We append a segment number and filename extensions to it.
+ */
+ public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath)
+ throws IOException {
+ this(conf, codec, workOutputPath, null);
+ }
+
+ /**
+ * Creates a WARC file, and opens it for writing. If a file with the same name already
+ * exists, it is *overwritten*. Note that this is different behaviour from the other
+ * constructor. Yes, this sucks. It will probably change in a future version.
+ *
+ * @param conf The Hadoop configuration.
+ * @param codec If null, the file is uncompressed. If non-null, this compression codec
+ * will be used. The codec's default file extension is appended to the filename.
+ * @param workOutputPath The directory and filename prefix to which the data should be
+ * written. We append a segment number and filename extensions to it.
+ * @param progress An object used by the mapred API for tracking a task's progress.
+ */
+ public WARCFileWriter(Configuration conf, CompressionCodec codec, Path workOutputPath,
+ Progressable progress) throws IOException {
+ this.conf = conf;
+ this.codec = codec;
+ this.workOutputPath = workOutputPath;
+ this.progress = progress;
+ this.extensionFormat = ".seg-%05d.attempt-%05d.warc" +
+ (codec == null ? "" : codec.getDefaultExtension());
+ this.maxSegmentSize = conf.getLong("warc.output.segment.size", DEFAULT_MAX_SEGMENT_SIZE);
+ createSegment();
+ }
+
+ /**
+ * Instantiates a Hadoop codec for compressing and decompressing Gzip files. This is the
+ * most common compression applied to WARC files.
+ *
+ * @param conf The Hadoop configuration.
+ */
+ public static CompressionCodec getGzipCodec(Configuration conf) {
+ try {
+ return (CompressionCodec) ReflectionUtils.newInstance(
+ conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec")
+ .asSubclass(CompressionCodec.class),
+ conf);
+ } catch (ClassNotFoundException e) {
+ logger.warn("GzipCodec could not be instantiated", e);
+ return null;
+ }
+ }
+
+ /**
+ * Creates an output segment file and sets up the output streams to point at it.
+ * If the file already exists, retries with a different filename. This is a bit nasty --
+ * after all, {@link FileOutputFormat}'s work directory concept is supposed to prevent
+ * filename clashes -- but it looks like Amazon Elastic MapReduce prevents use of per-task
+ * work directories if the output of a job is on S3.
+ *
+ * TODO: Investigate this and find a better solution.
+ */
+ private void createSegment() throws IOException {
+ segmentsAttempted = 0;
+ bytesWritten = 0;
+ boolean success = false;
+
+ while (!success) {
+ Path path = workOutputPath.suffix(String.format(extensionFormat, segmentsCreated,
+ segmentsAttempted));
+ FileSystem fs = path.getFileSystem(conf);
+
+ try {
+ // The o.a.h.mapred OutputFormats overwrite existing files, whereas
+ // the o.a.h.mapreduce OutputFormats don't overwrite. Bizarre...
+ // Here, overwrite if progress != null, i.e. if using mapred API.
+ FSDataOutputStream fsStream = (progress == null) ? fs.create(path, false) :
+ fs.create(path, progress);
+ byteStream = new CountingOutputStream(new BufferedOutputStream(fsStream));
+ dataStream = new DataOutputStream(codec == null ? byteStream :
+ codec.createOutputStream(byteStream));
+ segmentsCreated++;
+ logger.info("Writing to output file: {}", path);
+ success = true;
+
+ } catch (IOException e) {
+ if (e.getMessage().startsWith("File already exists")) {
+ logger.warn("Tried to create file {} but it already exists; retrying.", path);
+ segmentsAttempted++; // retry
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Appends a {@link WARCRecord} to the file, in WARC/1.0 format.
+ * @param record The record to be written.
+ */
+ public void write(WARCRecord record) throws IOException {
+ if (bytesWritten > maxSegmentSize) {
+ dataStream.close();
+ createSegment();
+ }
+ record.write(dataStream);
+ }
+
+ /**
+ * Appends a {@link WARCRecord} wrapped in a {@link WARCWritable} to the file.
+ * @param record The wrapper around the record to be written.
+ */
+ public void write(WARCWritable record) throws IOException {
+ if (record.getRecord() != null) {
+ write(record.getRecord());
+ }
+ }
+
+ /**
+ * Flushes any buffered data and closes the file.
+ */
+ public void close() throws IOException {
+ dataStream.close();
+ }
+
+ private class CountingOutputStream extends FilterOutputStream {
+ public CountingOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ bytesWritten += len;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ bytesWritten++;
+ }
+
+ // Overriding close() because FilterOutputStream's close() method pre-JDK8 has bad behavior:
+ // it silently ignores any exception thrown by flush(). Instead, just close the delegate
+ // stream. It should flush itself if necessary. (Thanks to the Guava project for noticing
+ // this.)
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCInputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCInputFormat.java
new file mode 100644
index 00000000000..5d5f38831d1
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCInputFormat.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * Hadoop InputFormat for mapreduce jobs ('new' API) that want to process data in WARC files.
+ *
+ * Usage:
+ *
+ * ```java
+ * Job job = new Job(getConf());
+ * job.setInputFormatClass(WARCInputFormat.class);
+ * ```
+ *
+ * Mappers should use a key of {@link org.apache.hadoop.io.LongWritable} (which is
+ * 1 for the first record in a file, 2 for the second record, etc.) and a value of
+ * {@link WARCWritable}.
+ */
+public class WARCInputFormat extends FileInputFormat {
+
+ /**
+ * Opens a WARC file (possibly compressed) for reading, and returns a RecordReader for
+ * accessing it.
+ */
+ @Override
+ public RecordReader createRecordReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new WARCReader();
+ }
+
+ /**
+ * Always returns false, as WARC files cannot be split.
+ */
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ return false;
+ }
+
+ private static class WARCReader extends RecordReader {
+ private final LongWritable key = new LongWritable();
+ private final WARCWritable value = new WARCWritable();
+ private WARCFileReader reader;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ reader = new WARCFileReader(context.getConfiguration(), ((FileSplit) split).getPath());
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ try {
+ WARCRecord record = reader.read();
+ key.set(reader.getRecordsRead());
+ value.setRecord(record);
+ return true;
+ } catch (EOFException eof) {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return reader.getProgress();
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public WARCWritable getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCOutputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCOutputFormat.java
new file mode 100644
index 00000000000..52d505cf916
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCOutputFormat.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Hadoop OutputFormat for mapreduce jobs ('new' API) that want to write data to WARC files.
+ *
+ * Usage:
+ *
+ * ```java
+ * Job job = new Job(getConf());
+ * job.setOutputFormatClass(WARCOutputFormat.class);
+ * job.setOutputKeyClass(NullWritable.class);
+ * job.setOutputValueClass(WARCWritable.class);
+ * FileOutputFormat.setCompressOutput(job, true);
+ * ```
+ *
+ * The tasks generating the output (usually the reducers, but may be the mappers if there
+ * are no reducers) should use `NullWritable.get()` as the output key, and the
+ * {@link WARCWritable} as the output value.
+ */
+public class WARCOutputFormat extends FileOutputFormat {
+
+ /**
+ * Creates a new output file in WARC format, and returns a RecordWriter for writing to it.
+ */
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new WARCWriter(context);
+ }
+
+ private class WARCWriter extends RecordWriter {
+ private final WARCFileWriter writer;
+
+ public WARCWriter(TaskAttemptContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ CompressionCodec codec =
+ getCompressOutput(context) ? WARCFileWriter.getGzipCodec(conf) : null;
+ Path workFile = getDefaultWorkFile(context, "");
+ this.writer = new WARCFileWriter(conf, codec, workFile);
+ }
+
+ @Override
+ public void write(NullWritable key, WARCWritable value)
+ throws IOException, InterruptedException {
+ writer.write(value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ writer.close();
+ }
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCRecord.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCRecord.java
new file mode 100644
index 00000000000..b2ff85bc933
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCRecord.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Immutable implementation of a record in a WARC file. You create a {@link WARCRecord}
+ * by parsing it out of a {@link DataInput} stream.
+ *
+ * The file format is documented in the
+ * [ISO Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
+ * In a nutshell, it's a textual format consisting of lines delimited by `\r\n`.
+ * Each record has the following structure:
+ *
+ * 1. A line indicating the WARC version number, such as `WARC/1.0`.
+ * 2. Several header lines (in key-value format, similar to HTTP or email headers),
+ * giving information about the record. The header is terminated by an empty line.
+ * 3. A body consisting of raw bytes (the number of bytes is indicated in one of the headers).
+ * 4. A final separator of `\r\n\r\n` before the next record starts.
+ *
+ * There are various different types of records, as documented on
+ * {@link Header#getRecordType()}.
+ */
+public class WARCRecord {
+
+ public static final String WARC_VERSION = "WARC/1.0";
+ private static final Pattern VERSION_PATTERN = Pattern.compile("WARC/[0-9\\.]+");
+ private static final Pattern CONTINUATION_PATTERN = Pattern.compile("^[\\t ]+.*");
+ private static final String CRLF = "\r\n";
+ private static final byte[] CRLF_BYTES = { 13, 10 };
+
+ private final Header header;
+ private final byte[] content;
+
+ /**
+ * Creates a new WARCRecord by parsing it out of a {@link DataInput} stream.
+ * @param in The input source from which one record will be read.
+ */
+ public WARCRecord(DataInput in) throws IOException {
+ header = readHeader(in);
+ content = new byte[header.getContentLength()];
+ in.readFully(content);
+ readSeparator(in);
+ }
+
+ private static Header readHeader(DataInput in) throws IOException {
+ String versionLine = readLine(in);
+ if (!VERSION_PATTERN.matcher(versionLine).matches()) {
+ throw new IllegalStateException("Expected WARC version, but got: " + versionLine);
+ }
+
+ LinkedHashMap headers = new LinkedHashMap();
+ String line, fieldName = null;
+
+ do {
+ line = readLine(in);
+ if (fieldName != null && CONTINUATION_PATTERN.matcher(line).matches()) {
+ headers.put(fieldName, headers.get(fieldName) + line);
+ } else if (!line.isEmpty()) {
+ String[] field = line.split(":", 2);
+ if (field.length < 2) {
+ throw new IllegalStateException("Malformed header line: " + line);
+ }
+ fieldName = field[0].trim();
+ headers.put(fieldName, field[1].trim());
+ }
+ } while (!line.isEmpty());
+
+ return new Header(headers);
+ }
+
+ private static String readLine(DataInput in) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ boolean seenCR = false, seenCRLF = false;
+ while (!seenCRLF) {
+ byte b = in.readByte();
+ if (!seenCR && b == 13) {
+ seenCR = true;
+ } else if (seenCR && b == 10) {
+ seenCRLF = true;
+ } else {
+ seenCR = false;
+ out.write(b);
+ }
+ }
+ return out.toString("UTF-8");
+ }
+
+ private static void readSeparator(DataInput in) throws IOException {
+ byte[] sep = new byte[4];
+ in.readFully(sep);
+ if (sep[0] != 13 || sep[1] != 10 || sep[2] != 13 || sep[3] != 10) {
+ throw new IllegalStateException(String.format(
+ "Expected final separator CR LF CR LF, but got: %d %d %d %d",
+ sep[0], sep[1], sep[2], sep[3]));
+ }
+ }
+
+ /**
+ * Returns the parsed header structure of the WARC record.
+ */
+ public Header getHeader() {
+ return header;
+ }
+
+ /**
+ * Returns the body of the record, as an unparsed raw array of bytes. The content
+ * of the body depends on the type of record (see {@link Header#getRecordType()}).
+ * For example, in the case of a `response` type header, the body consists of the
+ * full HTTP response returned by the server (HTTP headers followed by the body).
+ */
+ public byte[] getContent() {
+ return content;
+ }
+
+ /**
+ * Writes this record to a {@link DataOutput} stream. The output may, in some edge
+ * cases, be not byte-for-byte identical to what was parsed from a {@link DataInput}.
+ * However it has the same meaning and should not lose any information.
+ * @param out The output stream to which this record should be appended.
+ */
+ public void write(DataOutput out) throws IOException {
+ header.write(out);
+ out.write(CRLF_BYTES);
+ out.write(content);
+ out.write(CRLF_BYTES);
+ out.write(CRLF_BYTES);
+ }
+
+ /**
+ * Returns a human-readable string representation of the record.
+ */
+ @Override
+ public String toString() {
+ return header.toString();
+ }
+
+ /**
+ * Contains the parsed headers of a {@link WARCRecord}. Each record contains a number
+ * of headers in key-value format, where some header keys are standardised, but
+ * nonstandard ones can be added.
+ *
+ * The documentation of the methods in this class is excerpted from the
+ * [WARC 1.0 specification](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
+ * Please see the specification for more detail.
+ */
+ public final static class Header {
+ private final Map fields;
+
+ private Header(Map fields) {
+ this.fields = fields;
+ }
+
+ /**
+ * Returns the type of WARC record (the value of the `WARC-Type` header field).
+ * WARC 1.0 defines the following record types: (for full definitions, see the
+ * [spec](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf))
+ *
+ * * `warcinfo`: Describes the records that follow it, up through end of file,
+ * end of input, or until next `warcinfo` record. Typically, this appears once and
+ * at the beginning of a WARC file. For a web archive, it often contains information
+ * about the web crawl which generated the following records.
+ *
+ * The format of this descriptive record block may vary, though the use of the
+ * `"application/warc-fields"` content-type is recommended. (...)
+ *
+ * * `response`: The record should contain a complete scheme-specific response, including
+ * network protocol information where possible. For a target-URI of the `http` or
+ * `https` schemes, a `response` record block should contain the full HTTP
+ * response received over the network, including headers. That is, it contains the
+ * 'Response' message defined by section 6 of HTTP/1.1 (RFC2616).
+ *
+ * The WARC record's Content-Type field should contain the value defined by HTTP/1.1,
+ * `"application/http;msgtype=response"`. The payload of the record is defined as its
+ * 'entity-body' (per RFC2616), with any transfer-encoding removed.
+ *
+ * * `resource`: The record contains a resource, without full protocol response
+ * information. For example: a file directly retrieved from a locally accessible
+ * repository or the result of a networked retrieval where the protocol information
+ * has been discarded. For a target-URI of the `http` or `https` schemes, a `resource`
+ * record block shall contain the returned 'entity-body' (per RFC2616, with any
+ * transfer-encodings removed), possibly truncated.
+ *
+ * * `request`: The record holds the details of a complete scheme-specific request,
+ * including network protocol information where possible. For a target-URI of the
+ * `http` or `https` schemes, a `request` record block should contain the full HTTP
+ * request sent over the network, including headers. That is, it contains the
+ * 'Request' message defined by section 5 of HTTP/1.1 (RFC2616).
+ *
+ * The WARC record's Content-Type field should contain the value defined by HTTP/1.1,
+ * `"application/http;msgtype=request"`. The payload of a `request` record with a
+ * target-URI of scheme `http` or `https` is defined as its 'entity-body' (per
+ * RFC2616), with any transfer-encoding removed.
+ *
+ * * `metadata`: The record contains content created in order to further describe,
+ * explain, or accompany a harvested resource, in ways not covered by other record
+ * types. A `metadata` record will almost always refer to another record of another
+ * type, with that other record holding original harvested or transformed content.
+ *
+ * The format of the metadata record block may vary. The `"application/warc-fields"`
+ * format may be used.
+ *
+ * * `revisit`: The record describes the revisitation of content already archived,
+ * and might include only an abbreviated content body which has to be interpreted
+ * relative to a previous record. Most typically, a `revisit` record is used
+ * instead of a `response` or `resource` record to indicate that the content
+ * visited was either a complete or substantial duplicate of material previously
+ * archived.
+ *
+ * A `revisit` record shall contain a WARC-Profile field which determines the
+ * interpretation of the record's fields and record block. Please see the
+ * specification for details.
+ *
+ * * `conversion`: The record shall contain an alternative version of another
+ * record's content that was created as the result of an archival process.
+ * Typically, this is used to hold content transformations that maintain viability
+ * of content after widely available rendering tools for the originally stored
+ * format disappear. As needed, the original content may be migrated (transformed)
+ * to a more viable format in order to keep the information usable with current
+ * tools while minimizing loss of information.
+ *
+ * * `continuation`: Record blocks from `continuation` records must be appended to
+ * corresponding prior record blocks (eg. from other WARC files) to create the
+ * logically complete full-sized original record. That is, `continuation`
+ * records are used when a record that would otherwise cause a WARC file size to
+ * exceed a desired limit is broken into segments. A continuation record shall
+ * contain the named fields `WARC-Segment-Origin-ID` and `WARC-Segment-Number`,
+ * and the last `continuation` record of a series shall contain a
+ * `WARC-Segment-Total-Length` field. Please see the specification for details.
+ *
+ * * Other record types may be added in future, so this list is not exclusive.
+ *
+ * @return The record's `WARC-Type` header field, as a string.
+ */
+ public String getRecordType() {
+ return fields.get("WARC-Type");
+ }
+
+ /**
+ * A 14-digit UTC timestamp formatted according to YYYY-MM-DDThh:mm:ssZ, described
+ * in the W3C profile of ISO8601. The timestamp shall represent the instant that
+ * data capture for record creation began. Multiple records written as part of a
+ * single capture event shall use the same WARC-Date, even though the times of
+ * their writing will not be exactly synchronized.
+ *
+ * @return The record's `WARC-Date` header field, as a string.
+ */
+ public String getDateString() {
+ return fields.get("WARC-Date");
+ }
+
+ /**
+ * An identifier assigned to the current record that is globally unique for its
+ * period of intended use. No identifier scheme is mandated by this specification,
+ * but each record-id shall be a legal URI and clearly indicate a documented and
+ * registered scheme to which it conforms (e.g., via a URI scheme prefix such as
+ * `http:` or `urn:`).
+ *
+ * @return The record's `WARC-Record-ID` header field, as a string.
+ */
+ public String getRecordID() {
+ return fields.get("WARC-Record-ID");
+ }
+
+ /**
+ * The MIME type (RFC2045) of the information contained in the record's block. For
+ * example, in HTTP request and response records, this would be `application/http`
+ * as per section 19.1 of RFC2616 (or `application/http; msgtype=request` and
+ * `application/http; msgtype=response` respectively).
+ *
+ * In particular, the content-type is *not* the value of the HTTP Content-Type
+ * header in an HTTP response, but a MIME type to describe the full archived HTTP
+ * message (hence `application/http` if the block contains request or response
+ * headers).
+ *
+ * @return The record's `Content-Type` header field, as a string.
+ */
+ public String getContentType() {
+ return fields.get("Content-Type");
+ }
+
+ /**
+ * The original URI whose capture gave rise to the information content in this record.
+ * In the context of web harvesting, this is the URI that was the target of a
+ * crawler's retrieval request. For a `revisit` record, it is the URI that was the
+ * target of a retrieval request. Indirectly, such as for a `metadata`, or `conversion`
+ * record, it is a copy of the `WARC-Target-URI` appearing in the original record to
+ * which the newer record pertains. The URI in this value shall be properly escaped
+ * according to RFC3986, and written with no internal whitespace.
+ *
+ * @return The record's `WARC-Target-URI` header field, as a string.
+ */
+ public String getTargetURI() {
+ return fields.get("WARC-Target-URI");
+ }
+
+ /**
+ * The number of bytes in the body of the record, similar to RFC2616.
+ *
+ * @return The record's `Content-Length` header field, parsed into an int.
+ */
+ public int getContentLength() {
+ String lengthStr = fields.get("Content-Length");
+ if (lengthStr == null) {
+ throw new IllegalStateException("Missing Content-Length header");
+ }
+ try {
+ return Integer.parseInt(lengthStr);
+ } catch (NumberFormatException e) {
+ throw new IllegalStateException("Malformed Content-Length header: " + lengthStr);
+ }
+ }
+
+ /**
+ * Returns the value of a selected header field, or null if there is no header with
+ * that field name.
+ * @param field The name of the header to return (case-sensitive).
+ * @return The value associated with that field name, or null if not present.
+ */
+ public String getField(String field) {
+ return fields.get(field);
+ }
+
+ /**
+ * Appends this header to a {@link DataOutput} stream, in WARC/1.0 format.
+ * @param out The data output to which the header should be written.
+ */
+ public void write(DataOutput out) throws IOException {
+ out.write(toString().getBytes("UTF-8"));
+ }
+
+ /**
+ * Formats this header in WARC/1.0 format, consisting of a version line followed
+ * by colon-delimited key-value pairs, and `\r\n` line endings.
+ */
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(WARC_VERSION);
+ buf.append(CRLF);
+ for (Map.Entry field : fields.entrySet()) {
+ buf.append(field.getKey());
+ buf.append(": ");
+ buf.append(field.getValue());
+ buf.append(CRLF);
+ }
+ return buf.toString();
+ }
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCWritable.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCWritable.java
new file mode 100644
index 00000000000..bf5fc6b1fd1
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/util/warc/WARCWritable.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2014 Martin Kleppmann
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package org.apache.hadoop.hbase.test.util.warc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A mutable wrapper around a {@link WARCRecord} implementing the Hadoop Writable interface.
+ * This allows WARC records to be used throughout Hadoop (e.g. written to sequence files
+ * when shuffling data between mappers and reducers). The record is encoded as a single
+ * record in standard WARC/1.0 format.
+ */
+public class WARCWritable implements Writable {
+
+ private WARCRecord record;
+
+ /** Creates an empty writable (with a null record). */
+ public WARCWritable() {
+ this.record = null;
+ }
+
+ /** Creates a writable wrapper around a given WARCRecord. */
+ public WARCWritable(WARCRecord record) {
+ this.record = record;
+ }
+
+ /** Returns the record currently wrapped by this writable. */
+ public WARCRecord getRecord() {
+ return record;
+ }
+
+ /** Updates the record held within this writable wrapper. */
+ public void setRecord(WARCRecord record) {
+ this.record = record;
+ }
+
+ /** Appends the current record to a {@link DataOutput} stream. */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (record != null) {
+ record.write(out);
+ }
+ }
+
+ /**
+ * Parses a {@link WARCRecord} out of a {@link DataInput} stream, and makes it the
+ * writable's current record.
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ record = new WARCRecord(in);
+ }
+
+}
diff --git a/hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz b/hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz
new file mode 100644
index 00000000000..100dd3e7121
Binary files /dev/null and b/hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz differ
diff --git a/pom.xml b/pom.xml
index b97a7ab04ce..e6bd3c990a7 100755
--- a/pom.xml
+++ b/pom.xml
@@ -769,6 +769,8 @@
**/shaded/com/google/protobuf/**
**/src/main/patches/**
+
+ **/CC-MAIN-2021-10-warc.paths.gz