HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated

This commit is contained in:
zhangduo 2017-09-01 20:27:16 +08:00
parent 49986e9dfe
commit a37417c254
37 changed files with 1830 additions and 1711 deletions

View File

@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
/**
* Restore table implementation
@ -231,7 +231,7 @@ public class RestoreTablesClient {
LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
for (int i = 0; i < sTableList.size(); i++) {
if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
loaderResult = loader.run(mapForSrc[i], tTableArray[i]);
LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
if (loaderResult.isEmpty()) {
String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.util.Tool;

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;

View File

@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.TestLoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
@ -46,8 +47,6 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
* 1. Create table t1
* 2. Load data to t1

View File

@ -420,6 +420,10 @@ public class ColumnFamilyDescriptorBuilder {
return this;
}
public String getNameAsString() {
return desc.getNameAsString();
}
public ColumnFamilyDescriptorBuilder setBlockCacheEnabled(boolean value) {
desc.setBlockCacheEnabled(value);
return this;
@ -470,6 +474,10 @@ public class ColumnFamilyDescriptorBuilder {
return this;
}
public Compression.Algorithm getCompressionType() {
return desc.getCompressionType();
}
public ColumnFamilyDescriptorBuilder setConfiguration(final String key, final String value) {
desc.setConfiguration(key, value);
return this;
@ -610,7 +618,7 @@ public class ColumnFamilyDescriptorBuilder {
*/
@InterfaceAudience.Private
public ModifyableColumnFamilyDescriptor(final byte[] name) {
this(isLegalColumnFamilyName(name), getDefaultValuesBytes(), Collections.EMPTY_MAP);
this(isLegalColumnFamilyName(name), getDefaultValuesBytes(), Collections.emptyMap());
}
/**

View File

@ -48,10 +48,10 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
import org.apache.hadoop.hbase.mapreduce.ExportUtils;
import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.mapreduce.Import;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlConstants;

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;

View File

@ -20,9 +20,16 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
@ -53,6 +60,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RegionSplitter;
@ -77,15 +85,8 @@ import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
/**
* Test Bulk Load and MR on a distributed cluster.

View File

@ -32,21 +32,22 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.util.ProgramDriver;
/**

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.Partitioner;
*
* <p>This class is not suitable as partitioner creating hfiles
* for incremental bulk loads as region spread will likely change between time of
* hfile creation and load time. See {@link LoadIncrementalHFiles}
* hfile creation and load time. See {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles}
* and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
*
* @param <KEY> The type of the key.

View File

@ -36,6 +36,8 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -88,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -109,9 +112,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Simple test for {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output.

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.snapshot;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;

View File

@ -20,7 +20,7 @@
package org.apache.hadoop.hbase.snapshot;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
@ -82,6 +81,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;

View File

@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;

View File

@ -3484,7 +3484,7 @@ public class HBaseFsck extends Configured implements Closeable {
errors.print("This sidelined region dir should be bulk loaded: "
+ path.toString());
errors.print("Bulk load command looks like: "
+ "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles "
+ "hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles "
+ path.toUri().getPath() + " "+ tableName);
}
}

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
@ -72,9 +71,9 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@ -86,6 +85,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@Category({ CoprocessorTests.class, MediumTests.class })
public class TestRegionObserverInterface {
private static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);

View File

@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;

View File

@ -60,13 +60,13 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
package org.apache.hadoop.hbase.security;
import org.apache.hadoop.hbase.security.UserProvider;

View File

@ -25,6 +25,12 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -93,7 +99,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
@ -118,11 +123,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.log4j.Level;
@ -134,11 +137,9 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
/**
* Performs authorization checks for common operations, according to different

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.tool;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
/**
* This class provides shims for HBase to interact with the Hadoop 1.0.x and the
* Hadoop 0.23.x series.
*
* NOTE: No testing done against 0.22.x, or 0.21.x.
*/
abstract public class MapreduceTestingShim {
private static MapreduceTestingShim instance;
private static Class[] emptyParam = new Class[] {};
static {
try {
// This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
Class c = Class
.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
instance = new MapreduceV2Shim();
} catch (Exception e) {
instance = new MapreduceV1Shim();
}
}
abstract public JobContext newJobContext(Configuration jobConf)
throws IOException;
abstract public Job newJob(Configuration conf) throws IOException;
abstract public JobConf obtainJobConf(MiniMRCluster cluster);
abstract public String obtainMROutputDirProp();
public static JobContext createJobContext(Configuration jobConf)
throws IOException {
return instance.newJobContext(jobConf);
}
public static JobConf getJobConf(MiniMRCluster cluster) {
return instance.obtainJobConf(cluster);
}
public static Job createJob(Configuration conf) throws IOException {
return instance.newJob(conf);
}
public static String getMROutputDirProp() {
return instance.obtainMROutputDirProp();
}
private static class MapreduceV1Shim extends MapreduceTestingShim {
public JobContext newJobContext(Configuration jobConf) throws IOException {
// Implementing:
// return new JobContext(jobConf, new JobID());
JobID jobId = new JobID();
Constructor<JobContext> c;
try {
c = JobContext.class.getConstructor(Configuration.class, JobID.class);
return c.newInstance(jobConf, jobId);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to instantiate new JobContext(jobConf, new JobID())", e);
}
}
@Override
public Job newJob(Configuration conf) throws IOException {
// Implementing:
// return new Job(conf);
Constructor<Job> c;
try {
c = Job.class.getConstructor(Configuration.class);
return c.newInstance(conf);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to instantiate new Job(conf)", e);
}
}
public JobConf obtainJobConf(MiniMRCluster cluster) {
if (cluster == null) return null;
try {
Object runner = cluster.getJobTrackerRunner();
Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
Object tracker = meth.invoke(runner, new Object []{});
Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
return (JobConf) m.invoke(tracker, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
@Override
public String obtainMROutputDirProp() {
return "mapred.output.dir";
}
};
private static class MapreduceV2Shim extends MapreduceTestingShim {
public JobContext newJobContext(Configuration jobConf) {
return newJob(jobConf);
}
@Override
public Job newJob(Configuration jobConf) {
// Implementing:
// return Job.getInstance(jobConf);
try {
Method m = Job.class.getMethod("getInstance", Configuration.class);
return (Job) m.invoke(null, jobConf); // static method, then arg
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException(
"Failed to return from Job.getInstance(jobConf)");
}
}
public JobConf obtainJobConf(MiniMRCluster cluster) {
try {
Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
return (JobConf) meth.invoke(cluster, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
@Override
public String obtainMROutputDirProp() {
// This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR
// from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile.
return "mapreduce.output.fileoutputformat.outputdir";
}
};
}

View File

@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
package org.apache.hadoop.hbase.tool;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -37,23 +37,25 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileTestUtil;
@ -65,11 +67,10 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Test cases for the "load" half of the HFileOutputFormat bulk load
* functionality. These tests run faster than the full MR cluster
* tests in TestHFileOutputFormat
* Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
* faster than the full MR cluster tests in TestHFileOutputFormat
*/
@Category({MapReduceTests.class, LargeTests.class})
@Category({ MiscTests.class, LargeTests.class })
public class TestLoadIncrementalHFiles {
@Rule
public TestName tn = new TestName();
@ -81,18 +82,15 @@ public class TestLoadIncrementalHFiles {
static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
private static final byte[][] SPLIT_KEYS = new byte[][] {
Bytes.toBytes("ddd"),
Bytes.toBytes("ppp")
};
private static final byte[][] SPLIT_KEYS =
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
static HBaseTestingUtility util = new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
util.getConfiguration().setInt(
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
@ -114,23 +112,19 @@ public class TestLoadIncrementalHFiles {
@Test(timeout = 120000)
public void testSimpleLoadWithMap() throws Exception {
runTest("testSimpleLoadWithMap", BloomType.NONE,
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
}, true);
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
true);
}
/**
* Test case that creates some regions and loads
* HFiles that fit snugly inside those regions
* Test case that creates some regions and loads HFiles that fit snugly inside those regions
*/
@Test(timeout = 120000)
public void testSimpleLoad() throws Exception {
runTest("testSimpleLoad", BloomType.NONE,
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
});
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
}
@Test(timeout = 120000)
@ -138,23 +132,19 @@ public class TestLoadIncrementalHFiles {
String testName = tn.getMethodName();
final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
false, null, new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
}, false, true);
false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
false, true);
}
/**
* Test case that creates some regions and loads
* HFiles that cross the boundaries of those regions
* Test case that creates some regions and loads HFiles that cross the boundaries of those regions
*/
@Test(timeout = 120000)
public void testRegionCrossingLoad() throws Exception {
runTest("testRegionCrossingLoad", BloomType.NONE,
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
});
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
/**
@ -163,10 +153,8 @@ public class TestLoadIncrementalHFiles {
@Test(timeout = 60000)
public void testRegionCrossingRowBloom() throws Exception {
runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
});
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
/**
@ -175,33 +163,26 @@ public class TestLoadIncrementalHFiles {
@Test(timeout = 120000)
public void testRegionCrossingRowColBloom() throws Exception {
runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
});
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
/**
* Test case that creates some regions and loads HFiles that have
* different region boundaries than the table pre-split.
* Test case that creates some regions and loads HFiles that have different region boundaries than
* the table pre-split.
*/
@Test(timeout = 120000)
public void testSimpleHFileSplit() throws Exception {
runTest("testHFileSplit", BloomType.NONE,
new byte[][] {
Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
},
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
}
);
new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
}
/**
* Test case that creates some regions and loads HFiles that cross the boundaries
* and have different region boundaries than the table pre-split.
* Test case that creates some regions and loads HFiles that cross the boundaries and have
* different region boundaries than the table pre-split.
*/
@Test(timeout = 60000)
public void testRegionCrossingHFileSplit() throws Exception {
@ -209,8 +190,8 @@ public class TestLoadIncrementalHFiles {
}
/**
* Test case that creates some regions and loads HFiles that cross the boundaries
* have a ROW bloom filter and a different region boundaries than the table pre-split.
* Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
* filter and a different region boundaries than the table pre-split.
*/
@Test(timeout = 120000)
public void testRegionCrossingHFileSplitRowBloom() throws Exception {
@ -218,8 +199,8 @@ public class TestLoadIncrementalHFiles {
}
/**
* Test case that creates some regions and loads HFiles that cross the boundaries
* have a ROWCOL bloom filter and a different region boundaries than the table pre-split.
* Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
* bloom filter and a different region boundaries than the table pre-split.
*/
@Test(timeout = 120000)
public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
@ -229,63 +210,47 @@ public class TestLoadIncrementalHFiles {
@Test
public void testSplitALot() throws Exception {
runTest("testSplitALot", BloomType.NONE,
new byte[][] {
Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"),
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"),
Bytes.toBytes("eee"), Bytes.toBytes("fff"),
Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
Bytes.toBytes("iii"), Bytes.toBytes("lll"),
Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
Bytes.toBytes("ooo"), Bytes.toBytes("ppp"),
Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
Bytes.toBytes("zzz"),
},
new byte[][][] {
new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") },
}
);
new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
}
private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
new byte[][] {
Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
},
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
}
);
new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
familyDesc.setBloomFilterType(bloomType);
htd.addFamily(familyDesc);
return htd;
private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
return TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
.build();
}
private void runTest(String testName, BloomType bloomType,
byte[][][] hfileRanges) throws Exception {
private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
throws Exception {
runTest(testName, bloomType, null, hfileRanges);
}
private void runTest(String testName, BloomType bloomType,
byte[][][] hfileRanges, boolean useMap) throws Exception {
private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
throws Exception {
runTest(testName, bloomType, null, hfileRanges, useMap);
}
private void runTest(String testName, BloomType bloomType,
byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
byte[][][] hfileRanges) throws Exception {
runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
}
private void runTest(String testName, BloomType bloomType,
byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception {
private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
byte[][][] hfileRanges, boolean useMap) throws Exception {
final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
final boolean preCreateTable = tableSplitKeys != null;
@ -303,17 +268,17 @@ public class TestLoadIncrementalHFiles {
private void runTest(String testName, TableName tableName, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
throws Exception {
HTableDescriptor htd = buildHTD(tableName, bloomType);
TableDescriptor htd = buildHTD(tableName, bloomType);
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
}
public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util,
public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
byte[][][] hfileRanges, boolean useMap, boolean deleteFile,
boolean copyFiles, int initRowCount, int factor) throws Exception {
byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
int initRowCount, int factor) throws Exception {
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path familyDir = new Path(dir, Bytes.toString(fam));
int hfileIdx = 0;
@ -339,8 +304,8 @@ public class TestLoadIncrementalHFiles {
}
int expectedRows = hfileIdx * factor;
final TableName tableName = htd.getTableName();
if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
TableName tableName = htd.getTableName();
if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
util.getAdmin().createTable(htd, tableSplitKeys);
}
@ -351,12 +316,14 @@ public class TestLoadIncrementalHFiles {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String[] args = { dir.toString(), tableName.toString() };
if (useMap) {
if (deleteFile) fs.delete(last);
Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
if (deleteFile) {
fs.delete(last, true);
}
Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName);
if (deleteFile) {
expectedRows -= 1000;
for (LoadQueueItem item : loaded.keySet()) {
if (item.hfilePath.getName().equals(last.getName())) {
if (item.getFilePath().getName().equals(last.getName())) {
fail(last + " should be missing");
}
}
@ -381,15 +348,16 @@ public class TestLoadIncrementalHFiles {
return expectedRows;
}
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
private void runTest(String testName, TableDescriptor htd, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
boolean copyFiles) throws Exception {
loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys,
hfileRanges, useMap, true, copyFiles, 0, 1000);
loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
useMap, true, copyFiles, 0, 1000);
final TableName tableName = htd.getTableName();
// verify staging folder has been cleaned up
Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
Path stagingBasePath =
new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
FileSystem fs = util.getTestFileSystem();
if (fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath);
@ -403,33 +371,29 @@ public class TestLoadIncrementalHFiles {
}
/**
* Test that tags survive through a bulk load that needs to split hfiles.
*
* This test depends on the "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client
* can get tags in the responses.
* Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
* "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
* responses.
*/
@Test(timeout = 60000)
public void testTagsSurviveBulkLoadSplit() throws Exception {
Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
// table has these split points
byte [][] tableSplitKeys = new byte[][] {
Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
};
byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
// creating an hfile that has values that span the split points.
byte[] from = Bytes.toBytes("ddd");
byte[] to = Bytes.toBytes("ooo");
HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
new Path(familyDir, tn.getMethodName()+"_hfile"),
FAMILY, QUALIFIER, from, to, 1000);
new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
int expectedRows = 1000;
TableName tableName = TableName.valueOf(tn.getMethodName());
HTableDescriptor htd = buildHTD(tableName, BloomType.NONE);
TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
util.getAdmin().createTable(htd, tableSplitKeys);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
@ -453,18 +417,17 @@ public class TestLoadIncrementalHFiles {
@Test(timeout = 60000)
public void testNonexistentColumnFamilyLoad() throws Exception {
String testName = tn.getMethodName();
byte[][][] hFileRanges = new byte[][][] {
new byte[][]{ Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
};
byte[][][] hFileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
byte[] TABLE = Bytes.toBytes("mytable_" + testName);
// set real family name to upper case in purpose to simulate the case that
// family name in HFiles is invalid
HColumnDescriptor family =
new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)));
htd.addFamily(family);
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
.addColumnFamily(ColumnFamilyDescriptorBuilder
.of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
.build();
try {
runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
@ -473,8 +436,9 @@ public class TestLoadIncrementalHFiles {
assertTrue("IOException expected", e instanceof IOException);
// further check whether the exception message is correct
String errMsg = e.getMessage();
assertTrue("Incorrect exception message, expected message: ["
+ EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
assertTrue(
"Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY +
"], current message: [" + errMsg + "]",
errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
}
}
@ -490,18 +454,18 @@ public class TestLoadIncrementalHFiles {
}
/**
* Write a random data file and a non-file in a dir with a valid family name
* but not part of the table families. we should we able to bulkload without
* getting the unmatched family exception. HBASE-13037/HBASE-13227
* Write a random data file and a non-file in a dir with a valid family name but not part of the
* table families. we should we able to bulkload without getting the unmatched family exception.
* HBASE-13037/HBASE-13227
*/
private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
Path dir = util.getDataTestDirOnTestFS(tableName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
final String NON_FAMILY_FOLDER = "_logs";
@ -529,8 +493,7 @@ public class TestLoadIncrementalHFiles {
}
}
private static void createRandomDataFile(FileSystem fs, Path path, int size)
throws IOException {
private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
FSDataOutputStream stream = fs.create(path);
try {
byte[] data = new byte[1024];
@ -554,18 +517,15 @@ public class TestLoadIncrementalHFiles {
Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");
LoadIncrementalHFiles.splitStoreFile(
util.getConfiguration(), testIn,
familyDesc, Bytes.toBytes("ggg"),
bottomOut,
topOut);
LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
int rowCount = verifyHFile(bottomOut);
rowCount += verifyHFile(topOut);
@ -597,20 +557,16 @@ public class TestLoadIncrementalHFiles {
Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
familyDesc.setDataBlockEncoding(cfEncoding);
HFileTestUtil.createHFileWithDataBlockEncoding(
util.getConfiguration(), fs, testIn, bulkloadEncoding,
FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
ColumnFamilyDescriptor familyDesc =
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");
LoadIncrementalHFiles.splitStoreFile(
util.getConfiguration(), testIn,
familyDesc, Bytes.toBytes("ggg"),
bottomOut,
topOut);
LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
int rowCount = verifyHFile(bottomOut);
rowCount += verifyHFile(topOut);
@ -645,44 +601,49 @@ public class TestLoadIncrementalHFiles {
public void testInferBoundaries() {
TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
/* Toy example
* c---------i o------p s---------t v------x
* a------e g-----k m-------------q r----s u----w
*
* Should be inferred as:
* a-----------------k m-------------q r--------------t u---------x
*
* The output should be (m,r,u)
/*
* Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
* u----w Should be inferred as: a-----------------k m-------------q r--------------t
* u---------x The output should be (m,r,u)
*/
String first;
String last;
first = "a"; last = "e";
first = "a";
last = "e";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "r"; last = "s";
first = "r";
last = "s";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "o"; last = "p";
first = "o";
last = "p";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "g"; last = "k";
first = "g";
last = "k";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "v"; last = "x";
first = "v";
last = "x";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "c"; last = "i";
first = "c";
last = "i";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "m"; last = "q";
first = "m";
last = "q";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "s"; last = "t";
first = "s";
last = "t";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
first = "u"; last = "w";
first = "u";
last = "w";
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
@ -702,14 +663,14 @@ public class TestLoadIncrementalHFiles {
public void testLoadTooMayHFiles() throws Exception {
Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
byte[] from = Bytes.toBytes("begin");
byte[] to = Bytes.toBytes("end");
for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ i), FAMILY, QUALIFIER, from, to, 1000);
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
FAMILY, QUALIFIER, from, to, 1000);
}
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
@ -718,8 +679,8 @@ public class TestLoadIncrementalHFiles {
loader.run(args);
fail("Bulk loading too many files should fail");
} catch (IOException ie) {
assertTrue(ie.getMessage().contains("Trying to load more than "
+ MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
assertTrue(ie.getMessage()
.contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
}
}
@ -760,4 +721,3 @@ public class TestLoadIncrementalHFiles {
}
}
}

View File

@ -15,8 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
package org.apache.hadoop.hbase.tool;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -28,9 +29,9 @@ import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,11 +39,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
@ -50,6 +49,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -57,15 +57,20 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@ -77,15 +82,10 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
/**
* Test cases for the atomic load error handling of the bulk load functionality.
*/
@Category({MapReduceTests.class, LargeTests.class})
@Category({ MiscTests.class, LargeTests.class })
public class TestLoadIncrementalHFilesSplitRecovery {
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
@ -120,8 +120,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
return Bytes.toBytes(String.format("%010d", i));
}
public static void buildHFiles(FileSystem fs, Path dir, int value)
throws IOException {
public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
byte[] val = value(value);
for (int i = 0; i < NUM_CFS; i++) {
Path testIn = new Path(dir, family(i));
@ -131,20 +130,23 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
}
private TableDescriptor createTableDesc(TableName name, int cfs) {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
.forEachOrdered(builder::addColumnFamily);
return builder.build();
}
/**
* Creates a table with given table name and specified number of column
* families if the table does not already exist.
* Creates a table with given table name and specified number of column families if the table does
* not already exist.
*/
private void setupTable(final Connection connection, TableName table, int cfs)
throws IOException {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(table);
for (int i = 0; i < cfs; i++) {
htd.addFamily(new HColumnDescriptor(family(i)));
}
try (Admin admin = connection.getAdmin()) {
admin.createTable(htd);
admin.createTable(createTableDesc(table, cfs));
}
} catch (TableExistsException tee) {
LOG.info("Table " + table + " already exists");
@ -162,12 +164,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
throws IOException {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(table);
for (int i = 0; i < cfs; i++) {
htd.addFamily(new HColumnDescriptor(family(i)));
}
util.createTable(htd, SPLIT_KEYS);
util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
} catch (TableExistsException tee) {
LOG.info("Table " + table + " already exists");
}
@ -204,8 +201,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
// need to call regions server to by synchronous but isn't visible.
HRegionServer hrs = util.getRSForFirstRegionInTable(table);
for (HRegionInfo hri :
ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
if (hri.getTable().equals(table)) {
util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
// ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
@ -216,8 +212,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
int regions;
do {
regions = 0;
for (HRegionInfo hri :
ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
if (hri.getTable().equals(table)) {
regions++;
}
@ -247,38 +242,29 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
/**
* Checks that all columns have the expected value and that there is the
* expected number of rows.
* Checks that all columns have the expected value and that there is the expected number of rows.
* @throws IOException
*/
void assertExpectedTable(TableName table, int count, int value) throws IOException {
HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
assertEquals(htds.length, 1);
Table t = null;
try {
t = util.getConnection().getTable(table);
Scan s = new Scan();
ResultScanner sr = t.getScanner(s);
List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
assertEquals(htds.size(), 1);
try (Table t = util.getConnection().getTable(table);
ResultScanner sr = t.getScanner(new Scan())) {
int i = 0;
for (Result r : sr) {
for (Result r; (r = sr.next()) != null;) {
r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
.forEach(v -> assertArrayEquals(value(value), v));
i++;
for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
for (byte[] val : nm.values()) {
assertTrue(Bytes.equals(val, value(value)));
}
}
}
assertEquals(count, i);
} catch (IOException e) {
fail("Failed due to exception");
} finally {
if (t != null) t.close();
}
}
/**
* Test that shows that exception thrown from the RS side will result in an
* exception on the LIHFile client.
* Test that shows that exception thrown from the RS side will result in an exception on the
* LIHFile client.
*/
@Test(expected = IOException.class, timeout = 120000)
public void testBulkLoadPhaseFailure() throws Exception {
@ -286,11 +272,9 @@ public class TestLoadIncrementalHFilesSplitRecovery {
final AtomicInteger attmptedCalls = new AtomicInteger();
final AtomicInteger failedCalls = new AtomicInteger();
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
try (Connection connection = ConnectionFactory.createConnection(util
.getConfiguration())) {
try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
setupTable(connection, table, 10);
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
@Override
protected List<LoadQueueItem> tryAtomicRegionLoad(
ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
@ -329,32 +313,28 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
/**
* Test that shows that exception thrown from the RS side will result in the
* expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER}
* when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
* Test that shows that exception thrown from the RS side will result in the expected number of
* retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
* ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
*/
@Test
public void testRetryOnIOException() throws Exception {
final TableName table = TableName.valueOf(name.getMethodName());
final AtomicInteger calls = new AtomicInteger(1);
final Connection conn = ConnectionFactory.createConnection(util
.getConfiguration());
final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
util.getConfiguration().setBoolean(
LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
@Override
protected List<LoadQueueItem> tryAtomicRegionLoad(
ClientServiceCallable<byte[]> serverCallable, TableName tableName,
final byte[] first, Collection<LoadQueueItem> lqis)
throws IOException {
ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
Collection<LoadQueueItem> lqis) throws IOException {
if (calls.getAndIncrement() < util.getConfiguration().getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
conn, tableName, first, new RpcControllerFactory(
util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) -
1) {
ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
HConstants.PRIORITY_UNSET) {
@Override
public byte[] rpcCall() throws Exception {
throw new IOException("Error calling something on RegionServer");
@ -368,14 +348,11 @@ public class TestLoadIncrementalHFilesSplitRecovery {
};
setupTable(conn, table, 10);
Path dir = buildBulkFiles(table, 1);
lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table),
conn.getRegionLocator(table));
util.getConfiguration().setBoolean(
LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
}
@SuppressWarnings("deprecation")
private ClusterConnection getMockedConnection(final Configuration conf)
throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
ClusterConnection c = Mockito.mock(ClusterConnection.class);
@ -384,25 +361,24 @@ public class TestLoadIncrementalHFilesSplitRecovery {
// Make it so we return a particular location when asked.
final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
ServerName.valueOf("example.org", 1234, 0));
Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
(byte[]) Mockito.any(), Mockito.anyBoolean())).
thenReturn(loc);
Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
thenReturn(loc);
Mockito.when(
c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
.thenReturn(loc);
Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
ClientProtos.ClientService.BlockingInterface hri =
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
thenThrow(new ServiceException(new IOException("injecting bulk load error")));
Mockito.when(c.getClient(Mockito.any(ServerName.class))).
thenReturn(hri);
Mockito
.when(
hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
.thenThrow(new ServiceException(new IOException("injecting bulk load error")));
Mockito.when(c.getClient(Mockito.any(ServerName.class))).thenReturn(hri);
return c;
}
/**
* This test exercises the path where there is a split after initial
* validation but before the atomic bulk load call. We cannot use presplitting
* to test this path, so we actually inject a split just before the atomic
* region load.
* This test exercises the path where there is a split after initial validation but before the
* atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
* split just before the atomic region load.
*/
@Test(timeout = 120000)
public void testSplitWhileBulkLoadPhase() throws Exception {
@ -420,8 +396,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
protected void bulkLoadPhase(final Table htable, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
Map<LoadQueueItem, ByteBuffer> item2RegionMap)
throws IOException {
Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
int i = attemptedCalls.incrementAndGet();
if (i == 1) {
// On first attempt force a split.
@ -448,8 +423,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
/**
* This test splits a table and attempts to bulk load. The bulk import files
* should be split before atomically importing.
* This test splits a table and attempts to bulk load. The bulk import files should be split
* before atomically importing.
*/
@Test(timeout = 120000)
public void testGroupOrSplitPresplit() throws Exception {
@ -461,15 +436,13 @@ public class TestLoadIncrementalHFilesSplitRecovery {
forceSplit(table);
final AtomicInteger countedLqis = new AtomicInteger();
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
@Override
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
startEndKeys);
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
Pair<List<LoadQueueItem>, String> lqis =
super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null && lqis.getFirst() != null) {
countedLqis.addAndGet(lqis.getFirst().size());
}
@ -490,15 +463,15 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
/**
* This test creates a table with many small regions. The bulk load files
* would be splitted multiple times before all of them can be loaded successfully.
* This test creates a table with many small regions. The bulk load files would be splitted
* multiple times before all of them can be loaded successfully.
*/
@Test(timeout = 120000)
public void testSplitTmpFileCleanUp() throws Exception {
final TableName table = TableName.valueOf(name.getMethodName());
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
Bytes.toBytes("row_00000050") };
try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
@ -526,8 +499,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
/**
* This simulates an remote exception which should cause LIHF to exit with an
* exception.
* This simulates an remote exception which should cause LIHF to exit with an exception.
*/
@Test(expected = IOException.class, timeout = 120000)
public void testGroupOrSplitFailure() throws Exception {
@ -535,15 +507,13 @@ public class TestLoadIncrementalHFilesSplitRecovery {
try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
setupTable(connection, tableName, 10);
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
int i = 0;
@Override
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
i++;
if (i == 5) {
@ -583,11 +553,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
@Override
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
startEndKeys);
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
Pair<List<LoadQueueItem>, String> lqis =
super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null && lqis.getFirst() != null) {
countedLqis.addAndGet(lqis.getFirst().size());
}
@ -637,33 +606,23 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
/**
* Checks that all columns have the expected value and that there is the
* expected number of rows.
* Checks that all columns have the expected value and that there is the expected number of rows.
* @throws IOException
*/
void assertExpectedTable(final Connection connection, TableName table, int count, int value)
throws IOException {
HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
assertEquals(htds.length, 1);
Table t = null;
try {
t = connection.getTable(table);
Scan s = new Scan();
ResultScanner sr = t.getScanner(s);
List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
assertEquals(htds.size(), 1);
try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
int i = 0;
for (Result r : sr) {
for (Result r; (r = sr.next()) != null;) {
r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
.forEach(v -> assertArrayEquals(value(value), v));
i++;
for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
for (byte[] val : nm.values()) {
assertTrue(Bytes.equals(val, value(value)));
}
}
}
assertEquals(count, i);
} catch (IOException e) {
fail("Failed due to exception");
} finally {
if (t != null) t.close();
}
}
}

View File

@ -17,31 +17,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
package org.apache.hadoop.hbase.tool;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
/**
* Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode.
* This suite is unable to verify the security handoff/turnover
* as miniCluster is running as system user thus has root privileges
* and delegation tokens don't seem to work on miniDFS.
*
* Thus SecureBulkload can only be completely verified by running
* integration tests against a secure cluster. This suite is still
* invaluable as it verifies the other mechanisms that need to be
* Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode. This suite is unable
* to verify the security handoff/turnover as miniCluster is running as system user thus has root
* privileges and delegation tokens don't seem to work on miniDFS.
* <p>
* Thus SecureBulkload can only be completely verified by running integration tests against a secure
* cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
* supported as part of a LoadIncrementalFiles call.
*/
@Category({MapReduceTests.class, LargeTests.class})
@Category({ MiscTests.class, LargeTests.class })
public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles {
@BeforeClass
@ -51,8 +49,7 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(util.getConfiguration());
util.getConfiguration().setInt(
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
@ -67,4 +64,3 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
}
}

View File

@ -15,34 +15,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
package org.apache.hadoop.hbase.tool;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Reruns TestSecureLoadIncrementalHFilesSplitRecovery
* using LoadIncrementalHFiles in secure mode.
* This suite is unable to verify the security handoff/turnove
* as miniCluster is running as system user thus has root privileges
* and delegation tokens don't seem to work on miniDFS.
*
* Thus SecureBulkload can only be completely verified by running
* integration tests against a secure cluster. This suite is still
* invaluable as it verifies the other mechanisms that need to be
* Reruns TestSecureLoadIncrementalHFilesSplitRecovery using LoadIncrementalHFiles in secure mode.
* This suite is unable to verify the security handoff/turnove as miniCluster is running as system
* user thus has root privileges and delegation tokens don't seem to work on miniDFS.
* <p>
* Thus SecureBulkload can only be completely verified by running integration tests against a secure
* cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
* supported as part of a LoadIncrementalFiles call.
*/
@Category({MapReduceTests.class, LargeTests.class})
public class TestSecureLoadIncrementalHFilesSplitRecovery extends TestLoadIncrementalHFilesSplitRecovery {
@Category({ MiscTests.class, LargeTests.class })
public class TestSecureLoadIncrementalHFilesSplitRecovery
extends TestLoadIncrementalHFilesSplitRecovery {
// This "overrides" the parent static method
// make sure they are in sync

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;

View File

@ -41,7 +41,7 @@ import java.util.List;
* path/to/hbase-spark.jar {path/to/output/HFiles}
*
* This example will output put hfiles in {path/to/output/HFiles}, and user can run
* 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
* 'hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
*/
final public class JavaHBaseBulkLoadExample {
private JavaHBaseBulkLoadExample() {}

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.spark
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._

View File

@ -577,7 +577,7 @@ There are two ways to invoke this utility, with explicit classname and via the d
.Explicit Classname
----
$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
$ bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
----
.Driver