HBASE-11567 Write bulk load COMMIT events to WAL

This commit is contained in:
Jeffrey Zhong 2015-02-04 15:52:01 -08:00
parent a7b8112555
commit b0b0a74fef
13 changed files with 2775 additions and 1095 deletions

View File

@ -122,6 +122,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.TablePermission; import org.apache.hadoop.hbase.security.access.TablePermission;
import org.apache.hadoop.hbase.security.access.UserPermission; import org.apache.hadoop.hbase.security.access.UserPermission;
@ -2610,8 +2612,7 @@ public final class ProtobufUtil {
.setServer(toServerName(server)); .setServer(toServerName(server));
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) { for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
RegionEventDescriptor.StoreDescriptor.Builder builder StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
= RegionEventDescriptor.StoreDescriptor.newBuilder()
.setFamilyName(ByteStringer.wrap(entry.getKey())) .setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey())); .setStoreHomeDir(Bytes.toString(entry.getKey()));
for (Path path : entry.getValue()) { for (Path path : entry.getValue()) {
@ -2828,4 +2829,34 @@ public final class ProtobufUtil {
} }
return result; return result;
} }
/**
* Generates a marker for the WAL so that we propagate the notion of a bulk region load
* throughout the WAL.
*
* @param tableName The tableName into which the bulk load is being imported into.
* @param encodedRegionName Encoded region name of the region which is being bulk loaded.
* @param storeFiles A set of store files of a column family are bulk loaded.
* @param bulkloadSeqId sequence ID (by a force flush) used to create bulk load hfile
* name
* @return The WAL log marker for bulk loads.
*/
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles, long bulkloadSeqId) {
BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
.setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
for (Path path : entry.getValue()) {
builder.addStoreFile(path.getName());
}
desc.addStores(builder);
}
return desc.build();
}
} }

View File

@ -16611,7 +16611,7 @@ public final class FilterProtos {
/** /**
* <code>repeated .RowRange row_range_list = 1;</code> * <code>repeated .RowRange row_range_list = 1;</code>
*/ */
java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder> java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>
getRowRangeListOrBuilderList(); getRowRangeListOrBuilderList();
/** /**
* <code>repeated .RowRange row_range_list = 1;</code> * <code>repeated .RowRange row_range_list = 1;</code>
@ -17270,12 +17270,12 @@ public final class FilterProtos {
/** /**
* <code>repeated .RowRange row_range_list = 1;</code> * <code>repeated .RowRange row_range_list = 1;</code>
*/ */
public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder> public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder>
getRowRangeListBuilderList() { getRowRangeListBuilderList() {
return getRowRangeListFieldBuilder().getBuilderList(); return getRowRangeListFieldBuilder().getBuilderList();
} }
private com.google.protobuf.RepeatedFieldBuilder< private com.google.protobuf.RepeatedFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder> org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>
getRowRangeListFieldBuilder() { getRowRangeListFieldBuilder() {
if (rowRangeListBuilder_ == null) { if (rowRangeListBuilder_ == null) {
rowRangeListBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< rowRangeListBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<

View File

@ -5092,7 +5092,7 @@ public final class VisibilityLabelsProtos {
if (ref instanceof java.lang.String) { if (ref instanceof java.lang.String) {
return (java.lang.String) ref; return (java.lang.String) ref;
} else { } else {
com.google.protobuf.ByteString bs = com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref; (com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8(); java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) { if (bs.isValidUtf8()) {
@ -5108,7 +5108,7 @@ public final class VisibilityLabelsProtos {
getRegexBytes() { getRegexBytes() {
java.lang.Object ref = regex_; java.lang.Object ref = regex_;
if (ref instanceof java.lang.String) { if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8( com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref); (java.lang.String) ref);
regex_ = b; regex_ = b;
@ -5414,7 +5414,7 @@ public final class VisibilityLabelsProtos {
getRegexBytes() { getRegexBytes() {
java.lang.Object ref = regex_; java.lang.Object ref = regex_;
if (ref instanceof String) { if (ref instanceof String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8( com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref); (java.lang.String) ref);
regex_ = b; regex_ = b;

View File

@ -22,6 +22,7 @@ option java_generate_equals_and_hash = true;
option optimize_for = SPEED; option optimize_for = SPEED;
import "HBase.proto"; import "HBase.proto";
import "Client.proto";
message WALHeader { message WALHeader {
optional bool has_compression = 1; optional bool has_compression = 1;
@ -123,6 +124,22 @@ message FlushDescriptor {
repeated StoreFlushDescriptor store_flushes = 5; repeated StoreFlushDescriptor store_flushes = 5;
} }
message StoreDescriptor {
required bytes family_name = 1;
required string store_home_dir = 2; //relative to region dir
repeated string store_file = 3; // relative to store dir
}
/**
* Special WAL entry used for writing bulk load events to WAL
*/
message BulkLoadDescriptor {
required TableName table_name = 1;
required bytes encoded_region_name = 2;
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
}
/** /**
* Special WAL entry to hold all related to a region event (open/close). * Special WAL entry to hold all related to a region event (open/close).
*/ */
@ -132,12 +149,6 @@ message RegionEventDescriptor {
REGION_CLOSE = 1; REGION_CLOSE = 1;
} }
message StoreDescriptor {
required bytes family_name = 1;
required string store_home_dir = 2; //relative to region dir
repeated string store_file = 3; // relative to store dir
}
required EventType event_type = 1; required EventType event_type = 1;
required bytes table_name = 2; required bytes table_name = 2;
required bytes encoded_region_name = 3; required bytes encoded_region_name = 3;

View File

@ -62,6 +62,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -129,6 +130,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
@ -144,6 +146,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -224,7 +227,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* This is the global default value for durability. All tables/mutations not * This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value. * defining a durability or using USE_DEFAULT will default to this value.
*/ */
private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL; private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't /* Closing can take some time; use the closing flag if there is stuff we don't
@ -669,7 +672,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.rowProcessorTimeout = conf.getLong( this.rowProcessorTimeout = conf.getLong(
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
this.durability = htd.getDurability() == Durability.USE_DEFAULT this.durability = htd.getDurability() == Durability.USE_DEFAULT
? DEFAULT_DURABLITY ? DEFAULT_DURABILITY
: htd.getDurability(); : htd.getDurability();
if (rsServices != null) { if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting(); this.rsAccounting = this.rsServices.getRegionServerAccounting();
@ -758,7 +761,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
private long initializeRegionInternals(final CancelableProgressable reporter, private long initializeRegionInternals(final CancelableProgressable reporter,
final MonitoredTask status) throws IOException, UnsupportedEncodingException { final MonitoredTask status) throws IOException {
if (coprocessorHost != null) { if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-open hook"); status.setStatus("Running coprocessor pre-open hook");
coprocessorHost.preOpen(); coprocessorHost.preOpen();
@ -835,7 +838,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status) private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
throws IOException, UnsupportedEncodingException { throws IOException {
// Load in all the HStores. // Load in all the HStores.
long maxSeqId = -1; long maxSeqId = -1;
@ -1999,8 +2002,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
// no oldestUnflushedSeqId means we flushed all stores. // no oldestUnflushedSeqId means we flushed all stores.
// or the unflushed stores are all empty. // or the unflushed stores are all empty.
flushedSeqId = flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId
oldestUnflushedSeqId == HConstants.NO_SEQNUM ? flushOpSeqId : oldestUnflushedSeqId - 1; : oldestUnflushedSeqId - 1;
} else { } else {
// use the provided sequence Id as WAL is not being used for this flush. // use the provided sequence Id as WAL is not being used for this flush.
flushedSeqId = flushOpSeqId = myseqid; flushedSeqId = flushOpSeqId = myseqid;
@ -2263,7 +2266,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return getScanner(scan, null); return getScanner(scan, null);
} }
void prepareScanner(Scan scan) throws IOException { void prepareScanner(Scan scan) {
if(!scan.hasFamilies()) { if(!scan.hasFamilies()) {
// Adding all families to scanner // Adding all families to scanner
for(byte[] family: this.htableDescriptor.getFamiliesKeys()){ for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
@ -3239,7 +3242,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
closeRegionOperation(); closeRegionOperation();
} }
} }
private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException { private void doBatchMutate(Mutation mutation) throws IOException {
// Currently this is only called for puts and deletes, so no nonces. // Currently this is only called for puts and deletes, so no nonces.
OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }, OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
HConstants.NO_NONCE, HConstants.NO_NONCE); HConstants.NO_NONCE, HConstants.NO_NONCE);
@ -3596,7 +3599,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
protected long replayRecoveredEditsIfAny(final Path regiondir, protected long replayRecoveredEditsIfAny(final Path regiondir,
Map<byte[], Long> maxSeqIdInStores, Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status) final CancelableProgressable reporter, final MonitoredTask status)
throws UnsupportedEncodingException, IOException { throws IOException {
long minSeqIdForTheRegion = -1; long minSeqIdForTheRegion = -1;
for (Long maxSeqIdInStore : maxSeqIdInStores.values()) { for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) { if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
@ -4102,7 +4105,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return multipleFamilies; return multipleFamilies;
} }
/**
* Bulk load a/many HFiles into this region
*
* @param familyPaths A list which maps column families to the location of the HFile to load
* into that column family region.
* @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that all the
* edits lower than the highest sequential ID from all the HFiles are flushed
* on disk.
* @return true if successful, false if failed recoverably
* @throws IOException if failed unrecoverably.
*/
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
boolean assignSeqId) throws IOException { boolean assignSeqId) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, null); return bulkLoadHFiles(familyPaths, assignSeqId, null);
@ -4112,14 +4125,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* Attempts to atomically load a group of hfiles. This is critical for loading * Attempts to atomically load a group of hfiles. This is critical for loading
* rows with multiple column families atomically. * rows with multiple column families atomically.
* *
* @param familyPaths List of Pair<byte[] column family, String hfilePath> * @param familyPaths List of Pair<byte[] column family, String hfilePath>
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded * file about to be bulk loaded
* @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that
* all the edits lower than the highest sequential ID from all the
* HFiles are flushed on disk.
* @return true if successful, false if failed recoverably * @return true if successful, false if failed recoverably
* @throws IOException if failed unrecoverably. * @throws IOException if failed unrecoverably.
*/ */
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId, public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException { BulkLoadListener bulkLoadListener) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
Preconditions.checkNotNull(familyPaths); Preconditions.checkNotNull(familyPaths);
// we need writeLock for multi-family bulk load // we need writeLock for multi-family bulk load
startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths)); startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
@ -4165,7 +4183,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
StringBuilder list = new StringBuilder(); StringBuilder list = new StringBuilder();
for (Pair<byte[], String> p : failures) { for (Pair<byte[], String> p : failures) {
list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ") list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
.append(p.getSecond()); .append(p.getSecond());
} }
// problem when validating // problem when validating
LOG.warn("There was a recoverable bulk load failure likely due to a" + LOG.warn("There was a recoverable bulk load failure likely due to a" +
@ -4173,7 +4191,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return false; return false;
} }
long seqId = -1;
// We need to assign a sequential ID that's in between two memstores in order to preserve // We need to assign a sequential ID that's in between two memstores in order to preserve
// the guarantee that all the edits lower than the highest sequential ID from all the // the guarantee that all the edits lower than the highest sequential ID from all the
// HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is // HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is
@ -4197,11 +4214,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
Store store = getStore(familyName); Store store = getStore(familyName);
try { try {
String finalPath = path; String finalPath = path;
if(bulkLoadListener != null) { if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
} }
store.bulkLoadHFile(finalPath, seqId); store.bulkLoadHFile(finalPath, seqId);
if(bulkLoadListener != null) {
if(storeFiles.containsKey(familyName)) {
storeFiles.get(familyName).add(new Path(finalPath));
} else {
List<Path> storeFileNames = new ArrayList<Path>();
storeFileNames.add(new Path(finalPath));
storeFiles.put(familyName, storeFileNames);
}
if (bulkLoadListener != null) {
bulkLoadListener.doneBulkLoad(familyName, path); bulkLoadListener.doneBulkLoad(familyName, path);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
@ -4210,20 +4235,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// TODO Need a better story for reverting partial failures due to HDFS. // TODO Need a better story for reverting partial failures due to HDFS.
LOG.error("There was a partial failure due to IO when attempting to" + LOG.error("There was a partial failure due to IO when attempting to" +
" load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe); " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
if(bulkLoadListener != null) { if (bulkLoadListener != null) {
try { try {
bulkLoadListener.failedBulkLoad(familyName, path); bulkLoadListener.failedBulkLoad(familyName, path);
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error while calling failedBulkLoad for family "+ LOG.error("Error while calling failedBulkLoad for family " +
Bytes.toString(familyName)+" with path "+path, ex); Bytes.toString(familyName) + " with path " + path, ex);
} }
} }
throw ioe; throw ioe;
} }
} }
return true; return true;
} finally { } finally {
if (wal != null && !storeFiles.isEmpty()) {
// write a bulk load event when not all hfiles are loaded
try {
WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
loadDescriptor, sequenceId);
} catch (IOException ioe) {
if (this.rsServices != null) {
// Have to abort region server because some hfiles has been loaded but we can't write
// the event into WAL
this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
}
}
}
closeBulkRegionOperation(); closeBulkRegionOperation();
} }
} }
@ -5444,8 +5487,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
doProcessRowWithTimeout( doProcessRowWithTimeout(
processor, now, this, null, null, timeout); processor, now, this, null, null, timeout);
processor.postProcess(this, walEdit, true); processor.postProcess(this, walEdit, true);
} catch (IOException e) {
throw e;
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
} }
@ -5564,8 +5605,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// 14. Run post-process hook // 14. Run post-process hook
processor.postProcess(this, walEdit, walSyncSuccessful); processor.postProcess(this, walEdit, walSyncSuccessful);
} catch (IOException e) {
throw e;
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
if (!mutations.isEmpty() && if (!mutations.isEmpty() &&
@ -5726,8 +5765,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
} }
if (cell.getTagsLength() > 0) { if (cell.getTagsLength() > 0) {
Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
cell.getTagsLength()); cell.getTagsOffset(), cell.getTagsLength());
while (i.hasNext()) { while (i.hasNext()) {
newTags.add(i.next()); newTags.add(i.next());
} }
@ -6638,7 +6677,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
/** /**
* A mocked list implementaion - discards all updates. * A mocked list implementation - discards all updates.
*/ */
private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
@ -6882,7 +6921,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
/** /**
* Explictly sync wal * Explicitly sync wal
* @throws IOException * @throws IOException
*/ */
public void syncWal() throws IOException { public void syncWal() throws IOException {

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
@ -90,6 +91,7 @@ public class WALEdit implements Writable, HeapSize {
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH"); static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT"); static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT");
public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
private final int VERSION_2 = -1; private final int VERSION_2 = -1;
private final boolean isReplay; private final boolean isReplay;
@ -294,7 +296,7 @@ public class WALEdit implements Writable, HeapSize {
} }
/** /**
* Create a compacion WALEdit * Create a compaction WALEdit
* @param c * @param c
* @return A WALEdit that has <code>c</code> serialized as its value * @return A WALEdit that has <code>c</code> serialized as its value
*/ */
@ -326,4 +328,33 @@ public class WALEdit implements Writable, HeapSize {
} }
return null; return null;
} }
}
/**
* Create a bulk loader WALEdit
*
* @param hri The HRegionInfo for the region in which we are bulk loading
* @param bulkLoadDescriptor The descriptor for the Bulk Loader
* @return The WALEdit for the BulkLoad
*/
public static WALEdit createBulkLoadEvent(HRegionInfo hri,
WALProtos.BulkLoadDescriptor bulkLoadDescriptor) {
KeyValue kv = new KeyValue(getRowForRegion(hri),
METAFAMILY,
BULK_LOAD,
EnvironmentEdgeManager.currentTime(),
bulkLoadDescriptor.toByteArray());
return new WALEdit().add(kv);
}
/**
* Deserialized and returns a BulkLoadDescriptor from the passed in Cell
* @param cell the key value
* @return deserialized BulkLoadDescriptor or null.
*/
public static WALProtos.BulkLoadDescriptor getBulkLoadDescriptor(Cell cell) throws IOException {
if (CellUtil.matchingColumn(cell, METAFAMILY, BULK_LOAD)) {
return WALProtos.BulkLoadDescriptor.parseFrom(cell.getValue());
}
return null;
}
}

View File

@ -20,14 +20,17 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
@ -97,5 +100,41 @@ public class WALUtil {
} }
return trx; return trx;
} }
/**
* Write a log marker that a bulk load has succeeded and is about to be committed.
*
* @param wal The log to write into.
* @param htd A description of the table that we are bulk loading into.
* @param info A description of the region in the table that we are bulk loading into.
* @param descriptor A protocol buffers based description of the client's bulk loading request
* @param sequenceId The current sequenceId in the log at the time when we were to write the
* bulk load marker.
* @return txid of this transaction or if nothing to do, the last txid
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static long writeBulkLoadMarkerAndSync(final WAL wal,
final HTableDescriptor htd,
final HRegionInfo info,
final WALProtos.BulkLoadDescriptor descriptor,
final AtomicLong sequenceId) throws IOException {
TableName tn = info.getTable();
WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
// Add it to the log but the false specifies that we don't need to add it to the memstore
long trx = wal.append(htd,
info,
key,
WALEdit.createBulkLoadEvent(info, descriptor),
sequenceId,
false,
new ArrayList<Cell>());
wal.sync(trx);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(descriptor));
}
return trx;
}
} }

View File

@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.jmock.Expectations;
import org.jmock.integration.junit4.JUnitRuleMockery;
import org.jmock.lib.concurrent.Synchroniser;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* This class attempts to unit test bulk HLog loading.
*/
@Category(SmallTests.class)
public class TestBulkLoad {
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
@Rule
public final JUnitRuleMockery context = new JUnitRuleMockery() {{
setThreadingPolicy(new Synchroniser());
}};
private final WAL log = context.mock(WAL.class);
private final Configuration conf = HBaseConfiguration.create();
private final Random random = new Random();
private final byte[] randomBytes = new byte[100];
private final byte[] family1 = Bytes.toBytes("family1");
private final byte[] family2 = Bytes.toBytes("family2");
private final Expectations callOnce;
@Rule
public TestName name = new TestName();
public TestBulkLoad() throws IOException {
callOnce = new Expectations() {
{
oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
with(any(WALKey.class)), with(bulkLogWalEditType(WALEdit.BULK_LOAD)),
with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
will(returnValue(0l));
oneOf(log).sync(with(any(long.class)));
}
};
}
@Before
public void before() throws IOException {
random.nextBytes(randomBytes);
}
@Test
public void verifyBulkLoadEvent() throws IOException {
TableName tableName = TableName.valueOf("test", "test");
List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
byte[] familyName = familyPaths.get(0).getFirst();
String storeFileName = familyPaths.get(0).getSecond();
storeFileName = (new Path(storeFileName)).getName();
List<String> storeFileNames = new ArrayList<String>();
storeFileNames.add(storeFileName);
final Matcher<WALEdit> bulkEventMatcher = bulkLogWalEdit(WALEdit.BULK_LOAD,
tableName.toBytes(), familyName, storeFileNames);
Expectations expection = new Expectations() {
{
oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
with(any(WALKey.class)), with(bulkEventMatcher),
with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
will(returnValue(0l));
oneOf(log).sync(with(any(long.class)));
}
};
context.checking(expection);
testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
.bulkLoadHFiles(familyPaths, false);
}
@Test
public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<Pair<byte[], String>>(), false);
}
@Test
public void shouldBulkLoadSingleFamilyHLog() throws IOException {
context.checking(callOnce);
testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false);
}
@Test
public void shouldBulkLoadManyFamilyHLog() throws IOException {
context.checking(callOnce);
testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
false);
}
@Test
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
context.checking(callOnce);
TableName tableName = TableName.valueOf("test", "test");
testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
.bulkLoadHFiles(withFamilyPathsFor(family1, family2), false);
}
@Test(expected = DoNotRetryIOException.class)
public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false);
}
@Test(expected = DoNotRetryIOException.class)
public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
throws IOException {
testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false);
}
@Test(expected = DoNotRetryIOException.class)
public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
testRegionWithFamilies()
.bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
false);
}
@Test(expected = FileNotFoundException.class)
public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
testRegionWithFamilies(family1).bulkLoadHFiles(list, false);
}
private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
return new Pair<byte[], String>(family, "/tmp/does_not_exist");
}
private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
throws IOException {
createHFileForFamilies(family);
return new Pair<byte[], String>(new byte[]{0x00, 0x01, 0x02}, "/tmp/does_not_exist");
}
private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families)
throws IOException {
HRegionInfo hRegionInfo = new HRegionInfo(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
for (byte[] family : families) {
hTableDescriptor.addFamily(new HColumnDescriptor(family));
}
// TODO We need a way to do this without creating files
return HRegion.createHRegion(hRegionInfo,
new Path(testFolder.newFolder().toURI()),
conf,
hTableDescriptor,
log);
}
private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
}
private List<Pair<byte[], String>> getBlankFamilyPaths(){
return new ArrayList<Pair<byte[], String>>();
}
private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
for (byte[] family : families) {
familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(family)));
}
return familyPaths;
}
private String createHFileForFamilies(byte[] family) throws IOException {
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
// TODO We need a way to do this without creating files
File hFileLocation = testFolder.newFile();
hFileFactory.withOutputStream(new FSDataOutputStream(new FileOutputStream(hFileLocation)));
hFileFactory.withFileContext(new HFileContext());
HFile.Writer writer = hFileFactory.create();
writer.append(new KeyValue(CellUtil.createCell(randomBytes,
family,
randomBytes,
0l,
KeyValue.Type.Put.getCode(),
randomBytes)));
writer.close();
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
return new WalMatcher(typeBytes);
}
private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
byte[] familyName, List<String> storeFileNames) {
return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
}
private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
private final byte[] typeBytes;
private final byte[] tableName;
private final byte[] familyName;
private final List<String> storeFileNames;
public WalMatcher(byte[] typeBytes) {
this(typeBytes, null, null, null);
}
public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
List<String> storeFileNames) {
this.typeBytes = typeBytes;
this.tableName = tableName;
this.familyName = familyName;
this.storeFileNames = storeFileNames;
}
@Override
protected boolean matchesSafely(WALEdit item) {
assertTrue(Arrays.equals(item.getCells().get(0).getQualifier(), typeBytes));
BulkLoadDescriptor desc;
try {
desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
} catch (IOException e) {
return false;
}
assertNotNull(desc);
if (tableName != null) {
assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
tableName));
}
if(storeFileNames != null) {
int index=0;
StoreDescriptor store = desc.getStores(0);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
assertEquals(storeFileNames.size(), store.getStoreFileCount());
for (String storeFile : store.getStoreFileList()) {
assertTrue(storeFile.equals(storeFileNames.get(index++)));
}
}
return true;
}
@Override
public void describeTo(Description description) {
}
}
}

View File

@ -124,7 +124,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.StoreDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;

View File

@ -17,28 +17,26 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
@ -55,13 +53,26 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
/** /**
* Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
* the region server's bullkLoad functionality. * the region server's bullkLoad functionality.
@ -288,7 +299,11 @@ public class TestHRegionServerBulkLoad {
UTIL.startMiniCluster(1); UTIL.startMiniCluster(1);
try { try {
WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
FindBulkHBaseListener listener = new FindBulkHBaseListener();
log.registerWALActionsListener(listener);
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
assertThat(listener.isFound(), is(true));
} finally { } finally {
UTIL.shutdownMiniCluster(); UTIL.shutdownMiniCluster();
} }
@ -344,5 +359,25 @@ public class TestHRegionServerBulkLoad {
UTIL = new HBaseTestingUtility(c); UTIL = new HBaseTestingUtility(c);
} }
static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
private boolean found = false;
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
for (Cell cell : logEdit.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
for (Map.Entry entry : kv.toStringMap().entrySet()) {
if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
found = true;
}
}
}
}
public boolean isFound() {
return found;
}
}
} }

View File

@ -124,7 +124,7 @@ public class TestWALActionsListener {
/** /**
* Just counts when methods are called * Just counts when methods are called
*/ */
static class DummyWALActionsListener extends WALActionsListener.Base { public static class DummyWALActionsListener extends WALActionsListener.Base {
public int preLogRollCounter = 0; public int preLogRollCounter = 0;
public int postLogRollCounter = 0; public int postLogRollCounter = 0;
public int closedCount = 0; public int closedCount = 0;

17
pom.xml
View File

@ -1027,6 +1027,7 @@
<jetty.version>6.1.26</jetty.version> <jetty.version>6.1.26</jetty.version>
<jetty.jspapi.version>6.1.14</jetty.jspapi.version> <jetty.jspapi.version>6.1.14</jetty.jspapi.version>
<jersey.version>1.9</jersey.version> <jersey.version>1.9</jersey.version>
<jmock-junit4.version>2.6.0</jmock-junit4.version>
<jruby.version>1.6.8</jruby.version> <jruby.version>1.6.8</jruby.version>
<junit.version>4.11</junit.version> <junit.version>4.11</junit.version>
<hamcrest.version>1.3</hamcrest.version> <hamcrest.version>1.3</hamcrest.version>
@ -1545,6 +1546,18 @@
<artifactId>disruptor</artifactId> <artifactId>disruptor</artifactId>
<version>${disruptor.version}</version> <version>${disruptor.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<version>${jmock-junit4.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>junit-dep</artifactId>
<groupId>junit</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
<!-- Dependencies needed by subprojects --> <!-- Dependencies needed by subprojects -->
@ -1568,6 +1581,10 @@
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
</dependency>
</dependencies> </dependencies>
<!-- <!--
To publish, use the following settings.xml file ( placed in ~/.m2/settings.xml ) To publish, use the following settings.xml file ( placed in ~/.m2/settings.xml )