Applying spotless
This commit is contained in:
parent
501fc43ea1
commit
bef6552eab
File diff suppressed because it is too large
Load Diff
|
@ -1,5 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
@ -8,17 +30,13 @@ import org.apache.hadoop.mapreduce.InputSplit;
|
|||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.RecordReader;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
public class BulkDataGeneratorInputFormat extends InputFormat<Text, NullWritable> {
|
||||
|
||||
public static final String MAPPER_TASK_COUNT_KEY = BulkDataGeneratorInputFormat.class.getName() + "mapper.task.count";
|
||||
public static final String MAPPER_TASK_COUNT_KEY =
|
||||
BulkDataGeneratorInputFormat.class.getName() + "mapper.task.count";
|
||||
|
||||
@Override
|
||||
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
||||
|
@ -35,9 +53,10 @@ public class BulkDataGeneratorInputFormat extends InputFormat<Text, NullWritable
|
|||
}
|
||||
|
||||
@Override
|
||||
public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
BulkDataGeneratorRecordReader bulkDataGeneratorRecordReader = new BulkDataGeneratorRecordReader();
|
||||
public RecordReader<Text, NullWritable> createRecordReader(InputSplit split,
|
||||
TaskAttemptContext context) throws IOException, InterruptedException {
|
||||
BulkDataGeneratorRecordReader bulkDataGeneratorRecordReader =
|
||||
new BulkDataGeneratorRecordReader();
|
||||
bulkDataGeneratorRecordReader.initialize(split, context);
|
||||
return bulkDataGeneratorRecordReader;
|
||||
}
|
||||
|
@ -47,17 +66,21 @@ public class BulkDataGeneratorInputFormat extends InputFormat<Text, NullWritable
|
|||
*/
|
||||
private static class FakeInputSplit extends InputSplit implements Writable {
|
||||
|
||||
@Override public void readFields(DataInput arg0) throws IOException {
|
||||
@Override
|
||||
public void readFields(DataInput arg0) throws IOException {
|
||||
}
|
||||
|
||||
@Override public void write(DataOutput arg0) throws IOException {
|
||||
@Override
|
||||
public void write(DataOutput arg0) throws IOException {
|
||||
}
|
||||
|
||||
@Override public long getLength() throws IOException, InterruptedException {
|
||||
@Override
|
||||
public long getLength() throws IOException, InterruptedException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override public String[] getLocations() throws IOException, InterruptedException {
|
||||
@Override
|
||||
public String[] getLocations() throws IOException, InterruptedException {
|
||||
return new String[0];
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -9,24 +31,20 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
public class BulkDataGeneratorMapper extends
|
||||
Mapper<Text, NullWritable, ImmutableBytesWritable, KeyValue> {
|
||||
public class BulkDataGeneratorMapper
|
||||
extends Mapper<Text, NullWritable, ImmutableBytesWritable, KeyValue> {
|
||||
|
||||
/** Counter enumeration to count number of rows generated. */
|
||||
public static enum Counters {
|
||||
ROWS_GENERATED
|
||||
}
|
||||
|
||||
public static final String SPLIT_COUNT_KEY = BulkDataGeneratorMapper.class.getName() + "split.count";
|
||||
public static final String SPLIT_COUNT_KEY =
|
||||
BulkDataGeneratorMapper.class.getName() + "split.count";
|
||||
|
||||
private static final String ORG_ID = "00D000000000062";
|
||||
private static final int MAX_EVENT_ID = Integer.MAX_VALUE;
|
||||
|
@ -35,7 +53,8 @@ public class BulkDataGeneratorMapper extends
|
|||
private static final int NUM_LOCATIONS = 10;
|
||||
private static int splitCount = 1;
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
private static final Map<String, Pair<BigDecimal, BigDecimal>> LOCATIONS = Maps.newHashMapWithExpectedSize(NUM_LOCATIONS);
|
||||
private static final Map<String, Pair<BigDecimal, BigDecimal>> LOCATIONS =
|
||||
Maps.newHashMapWithExpectedSize(NUM_LOCATIONS);
|
||||
private static final List<String> LOCATION_KEYS = Lists.newArrayListWithCapacity(NUM_LOCATIONS);
|
||||
static {
|
||||
LOCATIONS.put("Belém", new Pair<>(BigDecimal.valueOf(-01.45), BigDecimal.valueOf(-48.48)));
|
||||
|
@ -43,9 +62,11 @@ public class BulkDataGeneratorMapper extends
|
|||
LOCATIONS.put("Campinas", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-47.05)));
|
||||
LOCATIONS.put("Cuiaba", new Pair<>(BigDecimal.valueOf(-07.25), BigDecimal.valueOf(-58.42)));
|
||||
LOCATIONS.put("Manaus", new Pair<>(BigDecimal.valueOf(-03.10), BigDecimal.valueOf(-60.00)));
|
||||
LOCATIONS.put("Porto Velho", new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90)));
|
||||
LOCATIONS.put("Porto Velho",
|
||||
new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90)));
|
||||
LOCATIONS.put("Recife", new Pair<>(BigDecimal.valueOf(-08.10), BigDecimal.valueOf(-34.88)));
|
||||
LOCATIONS.put("Rio de Janeiro", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23)));
|
||||
LOCATIONS.put("Rio de Janeiro",
|
||||
new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23)));
|
||||
LOCATIONS.put("Santarém", new Pair<>(BigDecimal.valueOf(-02.43), BigDecimal.valueOf(-54.68)));
|
||||
LOCATIONS.put("São Paulo", new Pair<>(BigDecimal.valueOf(-23.53), BigDecimal.valueOf(-46.62)));
|
||||
LOCATION_KEYS.addAll(LOCATIONS.keySet());
|
||||
|
@ -55,29 +76,34 @@ public class BulkDataGeneratorMapper extends
|
|||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
protected void setup(Context context) throws IOException,
|
||||
InterruptedException {
|
||||
protected void setup(Context context) throws IOException, InterruptedException {
|
||||
Configuration c = context.getConfiguration();
|
||||
splitCount = c.getInt(SPLIT_COUNT_KEY, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a single record based on value set to the key by {@link BulkDataGeneratorRecordReader#getCurrentKey()}.
|
||||
* {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures that records are equally distributed across all regions of the table since region boundaries are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)} method for region split info.
|
||||
* @param key - The key having index of next record to be generated
|
||||
* @param value - Value associated with the key (not used)
|
||||
* Generates a single record based on value set to the key by
|
||||
* {@link BulkDataGeneratorRecordReader#getCurrentKey()}.
|
||||
* {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first
|
||||
* {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures
|
||||
* that records are equally distributed across all regions of the table since region boundaries
|
||||
* are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)}
|
||||
* method for region split info.
|
||||
* @param key - The key having index of next record to be generated
|
||||
* @param value - Value associated with the key (not used)
|
||||
* @param context - Context of the mapper container
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Override
|
||||
protected void map(Text key, NullWritable value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
int recordIndex = Integer.parseInt(key.toString());
|
||||
int recordIndex = Integer.parseInt(key.toString());
|
||||
|
||||
// <6-characters-for-region-boundary-prefix>_<15-random-characters>_<record-index-for-this-mapper-task>
|
||||
final String toolEventId = String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex%(splitCount+1)) + "_" + EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_" + recordIndex;
|
||||
final String toolEventId =
|
||||
String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex % (splitCount + 1)) + "_"
|
||||
+ EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_"
|
||||
+ recordIndex;
|
||||
final String eventId = String.valueOf(Math.abs(random.nextInt(MAX_EVENT_ID)));
|
||||
final String vechileId = String.valueOf(Math.abs(random.nextInt(MAX_VEHICLE_ID)));
|
||||
final String speed = String.valueOf(Math.abs(random.nextInt(MAX_SPEED_KPH)));
|
||||
|
@ -86,7 +112,8 @@ public class BulkDataGeneratorMapper extends
|
|||
final BigDecimal latitude = coordinates.getFirst();
|
||||
final BigDecimal longitude = coordinates.getSecond();
|
||||
|
||||
final ImmutableBytesWritable hKey = new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes());
|
||||
final ImmutableBytesWritable hKey =
|
||||
new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes());
|
||||
addKeyValue(context, hKey, Utility.TableColumnNames.ORG_ID, ORG_ID);
|
||||
addKeyValue(context, hKey, Utility.TableColumnNames.TOOL_EVENT_ID, toolEventId);
|
||||
addKeyValue(context, hKey, Utility.TableColumnNames.EVENT_ID, eventId);
|
||||
|
@ -95,14 +122,17 @@ public class BulkDataGeneratorMapper extends
|
|||
addKeyValue(context, hKey, Utility.TableColumnNames.LATITUDE, latitude.toString());
|
||||
addKeyValue(context, hKey, Utility.TableColumnNames.LONGITUDE, longitude.toString());
|
||||
addKeyValue(context, hKey, Utility.TableColumnNames.LOCATION, location);
|
||||
addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP, String.valueOf(EnvironmentEdgeManager.currentTime()));
|
||||
addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP,
|
||||
String.valueOf(EnvironmentEdgeManager.currentTime()));
|
||||
|
||||
context.getCounter(Counters.ROWS_GENERATED).increment(1);
|
||||
}
|
||||
|
||||
private void addKeyValue(final Context context, ImmutableBytesWritable key, final Utility.TableColumnNames columnName, final String value)
|
||||
throws IOException, InterruptedException {
|
||||
KeyValue kv = new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes());
|
||||
private void addKeyValue(final Context context, ImmutableBytesWritable key,
|
||||
final Utility.TableColumnNames columnName, final String value)
|
||||
throws IOException, InterruptedException {
|
||||
KeyValue kv =
|
||||
new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes());
|
||||
context.write(key, kv);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.RecordReader;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
public class BulkDataGeneratorRecordReader extends RecordReader<Text, NullWritable> {
|
||||
|
||||
|
@ -16,14 +33,16 @@ public class BulkDataGeneratorRecordReader extends RecordReader<Text, NullWritab
|
|||
private Text key = new Text();
|
||||
private NullWritable value = NullWritable.get();
|
||||
|
||||
public static final String RECORDS_PER_MAPPER_TASK_KEY = BulkDataGeneratorInputFormat.class.getName() + "records.per.mapper.task";
|
||||
public static final String RECORDS_PER_MAPPER_TASK_KEY =
|
||||
BulkDataGeneratorInputFormat.class.getName() + "records.per.mapper.task";
|
||||
|
||||
@Override
|
||||
public void initialize(InputSplit split, TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
throws IOException, InterruptedException {
|
||||
// Get the number of records to create from the configuration
|
||||
this.numRecordsToCreate = context.getConfiguration().getInt(RECORDS_PER_MAPPER_TASK_KEY, -1);
|
||||
Preconditions.checkArgument(numRecordsToCreate > 0, "Number of records to be created by per mapper should be greater than 0.");
|
||||
Preconditions.checkArgument(numRecordsToCreate > 0,
|
||||
"Number of records to be created by per mapper should be greater than 0.");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,15 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Parser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -25,16 +38,21 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
|
|||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Parser;
|
||||
|
||||
/**
|
||||
* A command line utility to generate pre-splitted HBase Tables with large amount (TBs) of random data, equally distributed among all regions.
|
||||
* A command line utility to generate pre-splitted HBase Tables with large amount (TBs) of random
|
||||
* data, equally distributed among all regions.
|
||||
*/
|
||||
public class BulkDataGeneratorTool {
|
||||
|
||||
|
@ -43,7 +61,7 @@ public class BulkDataGeneratorTool {
|
|||
/**
|
||||
* Prefix for the generated HFiles directory
|
||||
*/
|
||||
private static final String OUTPUT_DIRECTORY_PREFIX = "/bulk_data_generator/" ;
|
||||
private static final String OUTPUT_DIRECTORY_PREFIX = "/bulk_data_generator/";
|
||||
|
||||
/**
|
||||
* Number of mapper container to be launched for generating of HFiles
|
||||
|
@ -61,7 +79,8 @@ public class BulkDataGeneratorTool {
|
|||
private String table;
|
||||
|
||||
/**
|
||||
* Number of splits for the {@link #table}. Number of regions for the table will be ({@link #splitCount} + 1).
|
||||
* Number of splits for the {@link #table}. Number of regions for the table will be
|
||||
* ({@link #splitCount} + 1).
|
||||
*/
|
||||
private int splitCount;
|
||||
|
||||
|
@ -93,7 +112,7 @@ public class BulkDataGeneratorTool {
|
|||
return false;
|
||||
}
|
||||
|
||||
if(line.hasOption("-h")) {
|
||||
if (line.hasOption("-h")) {
|
||||
printUsage();
|
||||
return true;
|
||||
}
|
||||
|
@ -101,12 +120,13 @@ public class BulkDataGeneratorTool {
|
|||
Path outputDirectory = generateOutputDirectory();
|
||||
logger.info("HFiles will be generated at " + outputDirectory.toString());
|
||||
|
||||
try(Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||
final Admin admin = connection.getAdmin();
|
||||
final TableName tableName = TableName.valueOf(table);
|
||||
if(admin.tableExists(tableName)) {
|
||||
if(deleteTableIfExist) {
|
||||
logger.info("Deleting the table since it already exist and delete-if-exist flag is set to true");
|
||||
if (admin.tableExists(tableName)) {
|
||||
if (deleteTableIfExist) {
|
||||
logger.info(
|
||||
"Deleting the table since it already exist and delete-if-exist flag is set to true");
|
||||
Utility.deleteTable(admin, table);
|
||||
} else {
|
||||
logger.info("Table already exists, cannot generate HFiles for existing table.");
|
||||
|
@ -129,10 +149,11 @@ public class BulkDataGeneratorTool {
|
|||
|
||||
boolean result = job.waitForCompletion(true);
|
||||
|
||||
if(result) {
|
||||
if (result) {
|
||||
logger.info("HFiles generated successfully. Starting bulk load to " + table);
|
||||
LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf);
|
||||
int loadIncrementalResult = loadIncrementalHFiles.run(new String[] {outputDirectory.getFileSystem(conf).makeQualified(outputDirectory).toString(), table});
|
||||
int loadIncrementalResult = loadIncrementalHFiles.run(new String[] {
|
||||
outputDirectory.getFileSystem(conf).makeQualified(outputDirectory).toString(), table });
|
||||
return (loadIncrementalResult == 0);
|
||||
} else {
|
||||
logger.info("Failed to generate HFiles.");
|
||||
|
@ -146,8 +167,7 @@ public class BulkDataGeneratorTool {
|
|||
}
|
||||
}
|
||||
|
||||
protected Job createSubmittableJob(Configuration conf)
|
||||
throws IOException {
|
||||
protected Job createSubmittableJob(Configuration conf) throws IOException {
|
||||
|
||||
conf.setInt(BulkDataGeneratorMapper.SPLIT_COUNT_KEY, splitCount);
|
||||
conf.setInt(BulkDataGeneratorInputFormat.MAPPER_TASK_COUNT_KEY, mapperCount);
|
||||
|
@ -167,30 +187,28 @@ public class BulkDataGeneratorTool {
|
|||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the random output directory path where HFiles will be generated
|
||||
* @return
|
||||
*/
|
||||
/** Returns Random output directory path where HFiles will be generated */
|
||||
protected Path generateOutputDirectory() {
|
||||
final String outputDirectory = OUTPUT_DIRECTORY_PREFIX + "/" + table + "-" + System.currentTimeMillis();
|
||||
final String outputDirectory =
|
||||
OUTPUT_DIRECTORY_PREFIX + "/" + table + "-" + System.currentTimeMillis();
|
||||
return new Path(outputDirectory);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method parses the command line parameters into instance variables
|
||||
* @throws ParseException
|
||||
*/
|
||||
protected void readCommandLineParameters(Configuration conf, CommandLine line)
|
||||
throws ParseException, IOException {
|
||||
throws ParseException, IOException {
|
||||
final List<String> genericParameters = new ArrayList<String>();
|
||||
|
||||
//Parse the generic options
|
||||
// Parse the generic options
|
||||
for (Map.Entry<Object, Object> entry : line.getOptionProperties("D").entrySet()) {
|
||||
genericParameters.add("-D");
|
||||
genericParameters.add(entry.getKey() + "=" + entry.getValue());
|
||||
}
|
||||
|
||||
logger.info("Parsed generic parameters: " + Arrays.toString(genericParameters.toArray(new String[0])));
|
||||
logger.info(
|
||||
"Parsed generic parameters: " + Arrays.toString(genericParameters.toArray(new String[0])));
|
||||
|
||||
new GenericOptionsParser(conf, genericParameters.toArray(new String[0]));
|
||||
|
||||
|
@ -199,7 +217,8 @@ public class BulkDataGeneratorTool {
|
|||
mapperCount = Integer.parseInt(line.getOptionValue("mapper-count"));
|
||||
Preconditions.checkArgument(mapperCount > 0, "Mapper count must be greater than 0");
|
||||
splitCount = Integer.parseInt(line.getOptionValue("split-count"));
|
||||
Preconditions.checkArgument((splitCount > 0) && (splitCount < Utility.MAX_SPLIT_COUNT), "Split count must be greater than 0 and less than " + Utility.MAX_SPLIT_COUNT);
|
||||
Preconditions.checkArgument((splitCount > 0) && (splitCount < Utility.MAX_SPLIT_COUNT),
|
||||
"Split count must be greater than 0 and less than " + Utility.MAX_SPLIT_COUNT);
|
||||
rowsPerMapper = Long.parseLong(line.getOptionValue("rows-per-mapper"));
|
||||
Preconditions.checkArgument(rowsPerMapper > 0, "Rows per mapper must be greater than 0");
|
||||
deleteTableIfExist = line.hasOption("delete-if-exist");
|
||||
|
@ -208,8 +227,8 @@ public class BulkDataGeneratorTool {
|
|||
|
||||
private void parseTableOptions(final CommandLine line) {
|
||||
final String tableOptionsAsString = line.getOptionValue("table-options");
|
||||
if(!StringUtils.isEmpty(tableOptionsAsString)) {
|
||||
for(String tableOption : tableOptionsAsString.split(",")) {
|
||||
if (!StringUtils.isEmpty(tableOptionsAsString)) {
|
||||
for (String tableOption : tableOptionsAsString.split(",")) {
|
||||
final String[] keyValueSplit = tableOption.split("=");
|
||||
final String key = keyValueSplit[0];
|
||||
final String value = keyValueSplit[1];
|
||||
|
@ -218,37 +237,34 @@ public class BulkDataGeneratorTool {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the command line options required by the sor job.
|
||||
*/
|
||||
/** Returns the command line option for {@link BulkDataGeneratorTool} */
|
||||
protected Options getOptions() {
|
||||
final Options options = new Options();
|
||||
Option option = new Option("t", "table", true,
|
||||
"The table name for which data need to be generated.");
|
||||
Option option =
|
||||
new Option("t", "table", true, "The table name for which data need to be generated.");
|
||||
options.addOption(option);
|
||||
|
||||
option = new Option("d", "delete-if-exist", false,
|
||||
"If it's set, the table will be deleted if already exist.");
|
||||
"If it's set, the table will be deleted if already exist.");
|
||||
options.addOption(option);
|
||||
|
||||
option = new Option("mc", "mapper-count", true,
|
||||
"The number of mapper containers to be launched.");
|
||||
option =
|
||||
new Option("mc", "mapper-count", true, "The number of mapper containers to be launched.");
|
||||
options.addOption(option);
|
||||
|
||||
option = new Option("sc", "split-count", true,
|
||||
"The number of regions/pre-splits to be created for the table.");
|
||||
"The number of regions/pre-splits to be created for the table.");
|
||||
options.addOption(option);
|
||||
|
||||
option = new Option("r", "rows-per-mapper", true,
|
||||
"The number of rows to be generated PER mapper.");
|
||||
option =
|
||||
new Option("r", "rows-per-mapper", true, "The number of rows to be generated PER mapper.");
|
||||
options.addOption(option);
|
||||
|
||||
option = new Option("o", "table-options", true,
|
||||
"Table options to be set while creating the table.");
|
||||
option =
|
||||
new Option("o", "table-options", true, "Table options to be set while creating the table.");
|
||||
options.addOption(option);
|
||||
|
||||
option = new Option("h", "help", false,
|
||||
"Show help message for the tool");
|
||||
option = new Option("h", "help", false, "Show help message for the tool");
|
||||
options.addOption(option);
|
||||
|
||||
return options;
|
||||
|
@ -259,10 +275,11 @@ public class BulkDataGeneratorTool {
|
|||
helpFormatter.setWidth(120);
|
||||
final String helpMessageCommand = "hbase " + BulkDataGeneratorTool.class.getName();
|
||||
final String commandSyntax = helpMessageCommand + " <OPTIONS> [-D<property=value>]*";
|
||||
final String helpMessageSuffix = "Examples:\n"
|
||||
+ helpMessageCommand + " -t TEST_TABLE -mc 10 -r 100 -sc 10\n"
|
||||
+ helpMessageCommand + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -d -o \"DISABLE_BACKUP=true,NORMALIZATION_ENABLED=false\"\n"
|
||||
+ helpMessageCommand + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -Dmapreduce.map.memory.mb=8192 -Dmapreduce.map.java.opts=-Xmx7782m\n";
|
||||
final String helpMessageSuffix = "Examples:\n" + helpMessageCommand
|
||||
+ " -t TEST_TABLE -mc 10 -r 100 -sc 10\n" + helpMessageCommand
|
||||
+ " -t TEST_TABLE -mc 10 -r 100 -sc 10 -d -o \"DISABLE_BACKUP=true,NORMALIZATION_ENABLED=false\"\n"
|
||||
+ helpMessageCommand
|
||||
+ " -t TEST_TABLE -mc 10 -r 100 -sc 10 -Dmapreduce.map.memory.mb=8192 -Dmapreduce.map.java.opts=-Xmx7782m\n";
|
||||
helpFormatter.printHelp(commandSyntax, "", getOptions(), helpMessageSuffix);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,34 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.bulkdatagenerator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
public class Utility {
|
||||
|
||||
/**
|
||||
* Schema for HBase table to be generated by generated and populated by {@link BulkDataGeneratorTool}
|
||||
* Schema for HBase table to be generated by generated and populated by
|
||||
* {@link BulkDataGeneratorTool}
|
||||
*/
|
||||
public enum TableColumnNames {
|
||||
ORG_ID ("orgId".getBytes()),
|
||||
TOOL_EVENT_ID ("toolEventId".getBytes()),
|
||||
EVENT_ID ("eventId".getBytes()),
|
||||
VEHICLE_ID ("vehicleId".getBytes()),
|
||||
SPEED ("speed".getBytes()),
|
||||
LATITUDE ("latitude".getBytes()),
|
||||
LONGITUDE ("longitude".getBytes()),
|
||||
LOCATION ("location".getBytes()),
|
||||
TIMESTAMP ("timestamp".getBytes());
|
||||
ORG_ID("orgId".getBytes()),
|
||||
TOOL_EVENT_ID("toolEventId".getBytes()),
|
||||
EVENT_ID("eventId".getBytes()),
|
||||
VEHICLE_ID("vehicleId".getBytes()),
|
||||
SPEED("speed".getBytes()),
|
||||
LATITUDE("latitude".getBytes()),
|
||||
LONGITUDE("longitude".getBytes()),
|
||||
LOCATION("location".getBytes()),
|
||||
TIMESTAMP("timestamp".getBytes());
|
||||
|
||||
private final byte[] columnName;
|
||||
|
||||
TableColumnNames (byte[] column) {
|
||||
TableColumnNames(byte[] column) {
|
||||
this.columnName = column;
|
||||
}
|
||||
|
||||
|
@ -49,23 +67,28 @@ public class Utility {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a pre-splitted HBase Table having single column family ({@link #COLUMN_FAMILY}) and sequential splits with {@link #SPLIT_PREFIX_LENGTH} length character prefix.
|
||||
* Example: If a table (TEST_TABLE_1) need to be generated with splitCount as 10, table would be created with (10+1) regions with boundaries end-keys as (000000-000001, 000001-000002, 000002-000003, ...., 0000010-)
|
||||
* @param admin - Admin object associated with HBase connection
|
||||
* @param tableName - Name of table to be created
|
||||
* @param splitCount - Number of splits for the table (Number of regions will be splitCount + 1)
|
||||
* Creates a pre-splitted HBase Table having single column family ({@link #COLUMN_FAMILY}) and
|
||||
* sequential splits with {@link #SPLIT_PREFIX_LENGTH} length character prefix. Example: If a
|
||||
* table (TEST_TABLE_1) need to be generated with splitCount as 10, table would be created with
|
||||
* (10+1) regions with boundaries end-keys as (000000-000001, 000001-000002, 000002-000003, ....,
|
||||
* 0000010-)
|
||||
* @param admin - Admin object associated with HBase connection
|
||||
* @param tableName - Name of table to be created
|
||||
* @param splitCount - Number of splits for the table (Number of regions will be splitCount + 1)
|
||||
* @param tableOptions - Additional HBase metadata properties to be set for the table
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void createTable(Admin admin, String tableName, int splitCount, Map<String, String> tableOptions) throws IOException {
|
||||
public static void createTable(Admin admin, String tableName, int splitCount,
|
||||
Map<String, String> tableOptions) throws IOException {
|
||||
Preconditions.checkArgument(splitCount > 0, "Split count must be greater than 0");
|
||||
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
|
||||
TableDescriptorBuilder tableDescriptorBuilder =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
|
||||
tableOptions.forEach(tableDescriptorBuilder::setValue);
|
||||
TableDescriptor tableDescriptor = tableDescriptorBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)).build();
|
||||
TableDescriptor tableDescriptor = tableDescriptorBuilder
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)).build();
|
||||
// Pre-splitting table based on splitCount
|
||||
byte[][] splitKeys = new byte[splitCount][];
|
||||
for(int i = 0; i<splitCount; i++) {
|
||||
splitKeys[i] = String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", i+1).getBytes();
|
||||
for (int i = 0; i < splitCount; i++) {
|
||||
splitKeys[i] = String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", i + 1).getBytes();
|
||||
}
|
||||
admin.createTable(tableDescriptor, splitKeys);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue