HBASE-27904: A random data generator tool
This commit is contained in:
parent
d539917514
commit
9b89ccb6a9
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.mapreduce.InputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
public class BulkDataGeneratorInputFormat extends InputFormat<Text, NullWritable> {
|
||||||
|
|
||||||
|
public static final String MAPPER_TASK_COUNT_KEY =
|
||||||
|
BulkDataGeneratorInputFormat.class.getName() + "mapper.task.count";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
||||||
|
// Get the number of mapper tasks configured
|
||||||
|
int mapperCount = job.getConfiguration().getInt(MAPPER_TASK_COUNT_KEY, -1);
|
||||||
|
Preconditions.checkArgument(mapperCount > 1, MAPPER_TASK_COUNT_KEY + " is not set.");
|
||||||
|
|
||||||
|
// Create a number of input splits equal to the number of mapper tasks
|
||||||
|
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
|
||||||
|
for (int i = 0; i < mapperCount; ++i) {
|
||||||
|
splits.add(new FakeInputSplit());
|
||||||
|
}
|
||||||
|
return splits;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordReader<Text, NullWritable> createRecordReader(InputSplit split,
|
||||||
|
TaskAttemptContext context) throws IOException, InterruptedException {
|
||||||
|
BulkDataGeneratorRecordReader bulkDataGeneratorRecordReader =
|
||||||
|
new BulkDataGeneratorRecordReader();
|
||||||
|
bulkDataGeneratorRecordReader.initialize(split, context);
|
||||||
|
return bulkDataGeneratorRecordReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dummy input split to be used by {@link BulkDataGeneratorRecordReader}
|
||||||
|
*/
|
||||||
|
private static class FakeInputSplit extends InputSplit implements Writable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput arg0) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput arg0) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLength() throws IOException, InterruptedException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getLocations() throws IOException, InterruptedException {
|
||||||
|
return new String[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,138 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import org.apache.commons.math3.util.Pair;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
public class BulkDataGeneratorMapper
|
||||||
|
extends Mapper<Text, NullWritable, ImmutableBytesWritable, KeyValue> {
|
||||||
|
|
||||||
|
/** Counter enumeration to count number of rows generated. */
|
||||||
|
public static enum Counters {
|
||||||
|
ROWS_GENERATED
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final String SPLIT_COUNT_KEY =
|
||||||
|
BulkDataGeneratorMapper.class.getName() + "split.count";
|
||||||
|
|
||||||
|
private static final String ORG_ID = "00D000000000062";
|
||||||
|
private static final int MAX_EVENT_ID = Integer.MAX_VALUE;
|
||||||
|
private static final int MAX_VEHICLE_ID = 100;
|
||||||
|
private static final int MAX_SPEED_KPH = 140;
|
||||||
|
private static final int NUM_LOCATIONS = 10;
|
||||||
|
private static int splitCount = 1;
|
||||||
|
private static final Random random = new Random(System.currentTimeMillis());
|
||||||
|
private static final Map<String, Pair<BigDecimal, BigDecimal>> LOCATIONS =
|
||||||
|
Maps.newHashMapWithExpectedSize(NUM_LOCATIONS);
|
||||||
|
private static final List<String> LOCATION_KEYS = Lists.newArrayListWithCapacity(NUM_LOCATIONS);
|
||||||
|
static {
|
||||||
|
LOCATIONS.put("Belém", new Pair<>(BigDecimal.valueOf(-01.45), BigDecimal.valueOf(-48.48)));
|
||||||
|
LOCATIONS.put("Brasília", new Pair<>(BigDecimal.valueOf(-15.78), BigDecimal.valueOf(-47.92)));
|
||||||
|
LOCATIONS.put("Campinas", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-47.05)));
|
||||||
|
LOCATIONS.put("Cuiaba", new Pair<>(BigDecimal.valueOf(-07.25), BigDecimal.valueOf(-58.42)));
|
||||||
|
LOCATIONS.put("Manaus", new Pair<>(BigDecimal.valueOf(-03.10), BigDecimal.valueOf(-60.00)));
|
||||||
|
LOCATIONS.put("Porto Velho",
|
||||||
|
new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90)));
|
||||||
|
LOCATIONS.put("Recife", new Pair<>(BigDecimal.valueOf(-08.10), BigDecimal.valueOf(-34.88)));
|
||||||
|
LOCATIONS.put("Rio de Janeiro",
|
||||||
|
new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23)));
|
||||||
|
LOCATIONS.put("Santarém", new Pair<>(BigDecimal.valueOf(-02.43), BigDecimal.valueOf(-54.68)));
|
||||||
|
LOCATIONS.put("São Paulo", new Pair<>(BigDecimal.valueOf(-23.53), BigDecimal.valueOf(-46.62)));
|
||||||
|
LOCATION_KEYS.addAll(LOCATIONS.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
final static byte[] COLUMN_FAMILY_BYTES = Utility.COLUMN_FAMILY.getBytes();
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
@Override
|
||||||
|
protected void setup(Context context) throws IOException, InterruptedException {
|
||||||
|
Configuration c = context.getConfiguration();
|
||||||
|
splitCount = c.getInt(SPLIT_COUNT_KEY, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a single record based on value set to the key by
|
||||||
|
* {@link BulkDataGeneratorRecordReader#getCurrentKey()}.
|
||||||
|
* {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first
|
||||||
|
* {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures
|
||||||
|
* that records are equally distributed across all regions of the table since region boundaries
|
||||||
|
* are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)}
|
||||||
|
* method for region split info.
|
||||||
|
* @param key - The key having index of next record to be generated
|
||||||
|
* @param value - Value associated with the key (not used)
|
||||||
|
* @param context - Context of the mapper container
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void map(Text key, NullWritable value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
int recordIndex = Integer.parseInt(key.toString());
|
||||||
|
|
||||||
|
// <6-characters-for-region-boundary-prefix>_<15-random-characters>_<record-index-for-this-mapper-task>
|
||||||
|
final String toolEventId =
|
||||||
|
String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex % (splitCount + 1)) + "_"
|
||||||
|
+ EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_"
|
||||||
|
+ recordIndex;
|
||||||
|
final String eventId = String.valueOf(Math.abs(random.nextInt(MAX_EVENT_ID)));
|
||||||
|
final String vechileId = String.valueOf(Math.abs(random.nextInt(MAX_VEHICLE_ID)));
|
||||||
|
final String speed = String.valueOf(Math.abs(random.nextInt(MAX_SPEED_KPH)));
|
||||||
|
final String location = LOCATION_KEYS.get(random.nextInt(NUM_LOCATIONS));
|
||||||
|
final Pair<BigDecimal, BigDecimal> coordinates = LOCATIONS.get(location);
|
||||||
|
final BigDecimal latitude = coordinates.getFirst();
|
||||||
|
final BigDecimal longitude = coordinates.getSecond();
|
||||||
|
|
||||||
|
final ImmutableBytesWritable hKey =
|
||||||
|
new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes());
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.ORG_ID, ORG_ID);
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.TOOL_EVENT_ID, toolEventId);
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.EVENT_ID, eventId);
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.VEHICLE_ID, vechileId);
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.SPEED, speed);
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.LATITUDE, latitude.toString());
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.LONGITUDE, longitude.toString());
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.LOCATION, location);
|
||||||
|
addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP,
|
||||||
|
String.valueOf(EnvironmentEdgeManager.currentTime()));
|
||||||
|
|
||||||
|
context.getCounter(Counters.ROWS_GENERATED).increment(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addKeyValue(final Context context, ImmutableBytesWritable key,
|
||||||
|
final Utility.TableColumnNames columnName, final String value)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
KeyValue kv =
|
||||||
|
new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes());
|
||||||
|
context.write(key, kv);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
public class BulkDataGeneratorRecordReader extends RecordReader<Text, NullWritable> {
|
||||||
|
|
||||||
|
private int numRecordsToCreate = 0;
|
||||||
|
private int createdRecords = 0;
|
||||||
|
private Text key = new Text();
|
||||||
|
private NullWritable value = NullWritable.get();
|
||||||
|
|
||||||
|
public static final String RECORDS_PER_MAPPER_TASK_KEY =
|
||||||
|
BulkDataGeneratorInputFormat.class.getName() + "records.per.mapper.task";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(InputSplit split, TaskAttemptContext context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
// Get the number of records to create from the configuration
|
||||||
|
this.numRecordsToCreate = context.getConfiguration().getInt(RECORDS_PER_MAPPER_TASK_KEY, -1);
|
||||||
|
Preconditions.checkArgument(numRecordsToCreate > 0,
|
||||||
|
"Number of records to be created by per mapper should be greater than 0.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean nextKeyValue() {
|
||||||
|
createdRecords++;
|
||||||
|
return createdRecords <= numRecordsToCreate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Text getCurrentKey() {
|
||||||
|
// Set the index of record to be created
|
||||||
|
key.set(String.valueOf(createdRecords));
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NullWritable getCurrentValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() throws IOException, InterruptedException {
|
||||||
|
return (float) createdRecords / (float) numRecordsToCreate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,285 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.*;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
|
||||||
|
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.Parser;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A command line utility to generate pre-splitted HBase Tables with large amount (TBs) of random
|
||||||
|
* data, equally distributed among all regions.
|
||||||
|
*/
|
||||||
|
public class BulkDataGeneratorTool {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(BulkDataGeneratorTool.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prefix for the generated HFiles directory
|
||||||
|
*/
|
||||||
|
private static final String OUTPUT_DIRECTORY_PREFIX = "/bulk_data_generator/";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of mapper container to be launched for generating of HFiles
|
||||||
|
*/
|
||||||
|
private int mapperCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of rows to be generated by each mapper
|
||||||
|
*/
|
||||||
|
private long rowsPerMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Table for which random data needs to be generated
|
||||||
|
*/
|
||||||
|
private String table;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of splits for the {@link #table}. Number of regions for the table will be
|
||||||
|
* ({@link #splitCount} + 1).
|
||||||
|
*/
|
||||||
|
private int splitCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flag to delete the table (before creating) if it already exists
|
||||||
|
*/
|
||||||
|
private boolean deleteTableIfExist;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Additional HBase meta-data options to be set for the table
|
||||||
|
*/
|
||||||
|
Map<String, String> tableOptions = new HashMap<>();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
BulkDataGeneratorTool bulkDataGeneratorTool = new BulkDataGeneratorTool();
|
||||||
|
bulkDataGeneratorTool.run(conf, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean run(Configuration conf, String[] args) throws IOException {
|
||||||
|
// Read CLI arguments
|
||||||
|
CommandLine line = null;
|
||||||
|
try {
|
||||||
|
Parser parser = new GnuParser();
|
||||||
|
line = parser.parse(getOptions(), args);
|
||||||
|
readCommandLineParameters(conf, line);
|
||||||
|
} catch (ParseException | IOException exception) {
|
||||||
|
logger.error("Error while parsing CLI arguments.", exception);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (line.hasOption("-h")) {
|
||||||
|
printUsage();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path outputDirectory = generateOutputDirectory();
|
||||||
|
logger.info("HFiles will be generated at " + outputDirectory.toString());
|
||||||
|
|
||||||
|
try (Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||||
|
final Admin admin = connection.getAdmin();
|
||||||
|
final TableName tableName = TableName.valueOf(table);
|
||||||
|
if (admin.tableExists(tableName)) {
|
||||||
|
if (deleteTableIfExist) {
|
||||||
|
logger.info(
|
||||||
|
"Deleting the table since it already exist and delete-if-exist flag is set to true");
|
||||||
|
Utility.deleteTable(admin, table);
|
||||||
|
} else {
|
||||||
|
logger.info("Table already exists, cannot generate HFiles for existing table.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creating the pre-split table
|
||||||
|
Utility.createTable(admin, table, splitCount, tableOptions);
|
||||||
|
logger.info(table + " created successfully");
|
||||||
|
|
||||||
|
Job job = createSubmittableJob(conf);
|
||||||
|
|
||||||
|
Table hbaseTable = connection.getTable(tableName);
|
||||||
|
|
||||||
|
// Auto configure partitioner and reducer
|
||||||
|
HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseTable.getRegionLocator());
|
||||||
|
|
||||||
|
FileOutputFormat.setOutputPath(job, outputDirectory);
|
||||||
|
|
||||||
|
boolean result = job.waitForCompletion(true);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
logger.info("HFiles generated successfully. Starting bulk load to " + table);
|
||||||
|
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
|
||||||
|
int loadHFilesResult = bulkLoadHFilesTool.run(new String[] {
|
||||||
|
outputDirectory.getFileSystem(conf).makeQualified(outputDirectory).toString(), table });
|
||||||
|
return (loadHFilesResult == 0);
|
||||||
|
} else {
|
||||||
|
logger.info("Failed to generate HFiles.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to generate data", e);
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
FileSystem.get(conf).deleteOnExit(outputDirectory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Job createSubmittableJob(Configuration conf) throws IOException {
|
||||||
|
|
||||||
|
conf.setInt(BulkDataGeneratorMapper.SPLIT_COUNT_KEY, splitCount);
|
||||||
|
conf.setInt(BulkDataGeneratorInputFormat.MAPPER_TASK_COUNT_KEY, mapperCount);
|
||||||
|
conf.setLong(BulkDataGeneratorRecordReader.RECORDS_PER_MAPPER_TASK_KEY, rowsPerMapper);
|
||||||
|
|
||||||
|
Job job = new Job(conf, BulkDataGeneratorTool.class.getSimpleName() + " - " + table);
|
||||||
|
|
||||||
|
job.setJarByClass(BulkDataGeneratorMapper.class);
|
||||||
|
job.setInputFormatClass(BulkDataGeneratorInputFormat.class);
|
||||||
|
|
||||||
|
HBaseConfiguration.addHbaseResources(conf);
|
||||||
|
|
||||||
|
job.setMapperClass(BulkDataGeneratorMapper.class);
|
||||||
|
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||||
|
job.setMapOutputValueClass(KeyValue.class);
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns Random output directory path where HFiles will be generated */
|
||||||
|
protected Path generateOutputDirectory() {
|
||||||
|
final String outputDirectory =
|
||||||
|
OUTPUT_DIRECTORY_PREFIX + "/" + table + "-" + System.currentTimeMillis();
|
||||||
|
return new Path(outputDirectory);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method parses the command line parameters into instance variables
|
||||||
|
*/
|
||||||
|
protected void readCommandLineParameters(Configuration conf, CommandLine line)
|
||||||
|
throws ParseException, IOException {
|
||||||
|
final List<String> genericParameters = new ArrayList<String>();
|
||||||
|
|
||||||
|
// Parse the generic options
|
||||||
|
for (Map.Entry<Object, Object> entry : line.getOptionProperties("D").entrySet()) {
|
||||||
|
genericParameters.add("-D");
|
||||||
|
genericParameters.add(entry.getKey() + "=" + entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Parsed generic parameters: " + Arrays.toString(genericParameters.toArray(new String[0])));
|
||||||
|
|
||||||
|
new GenericOptionsParser(conf, genericParameters.toArray(new String[0]));
|
||||||
|
|
||||||
|
table = line.getOptionValue("table");
|
||||||
|
Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name must not be empty");
|
||||||
|
mapperCount = Integer.parseInt(line.getOptionValue("mapper-count"));
|
||||||
|
Preconditions.checkArgument(mapperCount > 0, "Mapper count must be greater than 0");
|
||||||
|
splitCount = Integer.parseInt(line.getOptionValue("split-count"));
|
||||||
|
Preconditions.checkArgument((splitCount > 0) && (splitCount < Utility.MAX_SPLIT_COUNT),
|
||||||
|
"Split count must be greater than 0 and less than " + Utility.MAX_SPLIT_COUNT);
|
||||||
|
rowsPerMapper = Long.parseLong(line.getOptionValue("rows-per-mapper"));
|
||||||
|
Preconditions.checkArgument(rowsPerMapper > 0, "Rows per mapper must be greater than 0");
|
||||||
|
deleteTableIfExist = line.hasOption("delete-if-exist");
|
||||||
|
parseTableOptions(line);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void parseTableOptions(final CommandLine line) {
|
||||||
|
final String tableOptionsAsString = line.getOptionValue("table-options");
|
||||||
|
if (!StringUtils.isEmpty(tableOptionsAsString)) {
|
||||||
|
for (String tableOption : tableOptionsAsString.split(",")) {
|
||||||
|
final String[] keyValueSplit = tableOption.split("=");
|
||||||
|
final String key = keyValueSplit[0];
|
||||||
|
final String value = keyValueSplit[1];
|
||||||
|
tableOptions.put(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the command line option for {@link BulkDataGeneratorTool} */
|
||||||
|
protected Options getOptions() {
|
||||||
|
final Options options = new Options();
|
||||||
|
Option option =
|
||||||
|
new Option("t", "table", true, "The table name for which data need to be generated.");
|
||||||
|
options.addOption(option);
|
||||||
|
|
||||||
|
option = new Option("d", "delete-if-exist", false,
|
||||||
|
"If it's set, the table will be deleted if already exist.");
|
||||||
|
options.addOption(option);
|
||||||
|
|
||||||
|
option =
|
||||||
|
new Option("mc", "mapper-count", true, "The number of mapper containers to be launched.");
|
||||||
|
options.addOption(option);
|
||||||
|
|
||||||
|
option = new Option("sc", "split-count", true,
|
||||||
|
"The number of regions/pre-splits to be created for the table.");
|
||||||
|
options.addOption(option);
|
||||||
|
|
||||||
|
option =
|
||||||
|
new Option("r", "rows-per-mapper", true, "The number of rows to be generated PER mapper.");
|
||||||
|
options.addOption(option);
|
||||||
|
|
||||||
|
option =
|
||||||
|
new Option("o", "table-options", true, "Table options to be set while creating the table.");
|
||||||
|
options.addOption(option);
|
||||||
|
|
||||||
|
option = new Option("h", "help", false, "Show help message for the tool");
|
||||||
|
options.addOption(option);
|
||||||
|
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void printUsage() {
|
||||||
|
final HelpFormatter helpFormatter = new HelpFormatter();
|
||||||
|
helpFormatter.setWidth(120);
|
||||||
|
final String helpMessageCommand = "hbase " + BulkDataGeneratorTool.class.getName();
|
||||||
|
final String commandSyntax = helpMessageCommand + " <OPTIONS> [-D<property=value>]*";
|
||||||
|
final String helpMessageSuffix = "Examples:\n" + helpMessageCommand
|
||||||
|
+ " -t TEST_TABLE -mc 10 -r 100 -sc 10\n" + helpMessageCommand
|
||||||
|
+ " -t TEST_TABLE -mc 10 -r 100 -sc 10 -d -o \"DISABLE_BACKUP=true,NORMALIZATION_ENABLED=false\"\n"
|
||||||
|
+ helpMessageCommand
|
||||||
|
+ " -t TEST_TABLE -mc 10 -r 100 -sc 10 -Dmapreduce.map.memory.mb=8192 -Dmapreduce.map.java.opts=-Xmx7782m\n";
|
||||||
|
helpFormatter.printHelp(commandSyntax, "", getOptions(), helpMessageSuffix);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,95 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
public class Utility {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schema for HBase table to be generated by generated and populated by
|
||||||
|
* {@link BulkDataGeneratorTool}
|
||||||
|
*/
|
||||||
|
public enum TableColumnNames {
|
||||||
|
ORG_ID("orgId".getBytes()),
|
||||||
|
TOOL_EVENT_ID("toolEventId".getBytes()),
|
||||||
|
EVENT_ID("eventId".getBytes()),
|
||||||
|
VEHICLE_ID("vehicleId".getBytes()),
|
||||||
|
SPEED("speed".getBytes()),
|
||||||
|
LATITUDE("latitude".getBytes()),
|
||||||
|
LONGITUDE("longitude".getBytes()),
|
||||||
|
LOCATION("location".getBytes()),
|
||||||
|
TIMESTAMP("timestamp".getBytes());
|
||||||
|
|
||||||
|
private final byte[] columnName;
|
||||||
|
|
||||||
|
TableColumnNames(byte[] column) {
|
||||||
|
this.columnName = column;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getColumnName() {
|
||||||
|
return this.columnName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final String COLUMN_FAMILY = "cf";
|
||||||
|
|
||||||
|
public static final int SPLIT_PREFIX_LENGTH = 6;
|
||||||
|
|
||||||
|
public static final int MAX_SPLIT_COUNT = (int) Math.pow(10, SPLIT_PREFIX_LENGTH);
|
||||||
|
|
||||||
|
public static void deleteTable(Admin admin, String tableName) throws IOException {
|
||||||
|
admin.disableTable(TableName.valueOf(tableName));
|
||||||
|
admin.deleteTable(TableName.valueOf(tableName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a pre-splitted HBase Table having single column family ({@link #COLUMN_FAMILY}) and
|
||||||
|
* sequential splits with {@link #SPLIT_PREFIX_LENGTH} length character prefix. Example: If a
|
||||||
|
* table (TEST_TABLE_1) need to be generated with splitCount as 10, table would be created with
|
||||||
|
* (10+1) regions with boundaries end-keys as (000000-000001, 000001-000002, 000002-000003, ....,
|
||||||
|
* 0000010-)
|
||||||
|
* @param admin - Admin object associated with HBase connection
|
||||||
|
* @param tableName - Name of table to be created
|
||||||
|
* @param splitCount - Number of splits for the table (Number of regions will be splitCount + 1)
|
||||||
|
* @param tableOptions - Additional HBase metadata properties to be set for the table
|
||||||
|
*/
|
||||||
|
public static void createTable(Admin admin, String tableName, int splitCount,
|
||||||
|
Map<String, String> tableOptions) throws IOException {
|
||||||
|
Preconditions.checkArgument(splitCount > 0, "Split count must be greater than 0");
|
||||||
|
TableDescriptorBuilder tableDescriptorBuilder =
|
||||||
|
TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
|
||||||
|
tableOptions.forEach(tableDescriptorBuilder::setValue);
|
||||||
|
TableDescriptor tableDescriptor = tableDescriptorBuilder
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)).build();
|
||||||
|
// Pre-splitting table based on splitCount
|
||||||
|
byte[][] splitKeys = new byte[splitCount][];
|
||||||
|
for (int i = 0; i < splitCount; i++) {
|
||||||
|
splitKeys[i] = String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", i + 1).getBytes();
|
||||||
|
}
|
||||||
|
admin.createTable(tableDescriptor, splitKeys);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue