HBASE-11571 Bulk load handling from secondary region replicas

This commit is contained in:
Jeffrey Zhong 2015-02-27 17:21:06 -08:00
parent 524791bcf5
commit 883d6fd8e5
6 changed files with 211 additions and 14 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
@ -4663,6 +4664,86 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
} }
void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
"BulkLoad marker from WAL ", bulkLoadEvent);
if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return; // if primary nothing to do
}
if (LOG.isDebugEnabled()) {
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
}
// check if multiple families involved
boolean multipleFamilies = false;
byte[] family = null;
for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
byte[] fam = storeDescriptor.getFamilyName().toByteArray();
if (family == null) {
family = fam;
} else if (!Bytes.equals(family, fam)) {
multipleFamilies = true;
break;
}
}
startBulkRegionOperation(multipleFamilies);
try {
// we will use writestate as a coarse-grain lock for all the replay events
synchronized (writestate) {
// Replication can deliver events out of order when primary region moves or the region
// server crashes, since there is no coordination between replication of different wal files
// belonging to different region servers. We have to safe guard against this case by using
// region open event's seqid. Since this is the first event that the region puts (after
// possibly flushing recovered.edits), after seeing this event, we can ignore every edit
// smaller than this seqId
if (bulkLoadEvent.getBulkloadSeqNum() >= 0
&& this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying bulkload event :"
+ TextFormat.shortDebugString(bulkLoadEvent)
+ " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
+ " =" + lastReplayedOpenRegionSeqId);
return;
}
for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
// stores of primary may be different now
family = storeDescriptor.getFamilyName().toByteArray();
Store store = getStore(family);
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a bulk load marker from primary, but the family is not found. "
+ "Ignoring. StoreDescriptor:" + storeDescriptor);
continue;
}
List<String> storeFiles = storeDescriptor.getStoreFileList();
for (String storeFile : storeFiles) {
StoreFileInfo storeFileInfo = null;
try {
storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
store.bulkLoadHFile(storeFileInfo);
} catch(FileNotFoundException ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ ((storeFileInfo != null) ? storeFileInfo.toString() :
(new Path(Bytes.toString(family), storeFile)).toString())
+ " doesn't exist any more. Skip loading the file");
}
}
}
}
if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
}
} finally {
closeBulkRegionOperation();
}
}
/** Checks whether the given regionName is either equal to our region, or that /** Checks whether the given regionName is either equal to our region, or that
* the regionName is the primary region to our corresponding range for the secondary replica. * the regionName is the primary region to our corresponding range for the secondary replica.
*/ */
@ -5013,13 +5094,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (bulkLoadListener != null) { if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
} }
store.bulkLoadHFile(finalPath, seqId); Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
if(storeFiles.containsKey(familyName)) { if(storeFiles.containsKey(familyName)) {
storeFiles.get(familyName).add(new Path(finalPath)); storeFiles.get(familyName).add(commitedStoreFile);
} else { } else {
List<Path> storeFileNames = new ArrayList<Path>(); List<Path> storeFileNames = new ArrayList<Path>();
storeFileNames.add(new Path(finalPath)); storeFileNames.add(commitedStoreFile);
storeFiles.put(familyName, storeFileNames); storeFiles.put(familyName, storeFileNames);
} }
if (bulkLoadListener != null) { if (bulkLoadListener != null) {

View File

@ -781,19 +781,33 @@ public class HStore implements Store {
} }
@Override @Override
public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
Path srcPath = new Path(srcPathStr); Path srcPath = new Path(srcPathStr);
Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
StoreFile sf = createStoreFileAndReader(dstPath); LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
+ dstPath + " - updating store file list.");
StoreFile sf = createStoreFileAndReader(dstPath);
bulkLoadHFile(sf);
LOG.info("Successfully loaded store file " + srcPath + " into store " + this
+ " (new location: " + dstPath + ")");
return dstPath;
}
@Override
public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
StoreFile sf = createStoreFileAndReader(fileInfo);
bulkLoadHFile(sf);
}
private void bulkLoadHFile(StoreFile sf) throws IOException {
StoreFile.Reader r = sf.getReader(); StoreFile.Reader r = sf.getReader();
this.storeSize += r.length(); this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes(); this.totalUncompressedBytes += r.getTotalUncompressedBytes();
LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
"' as " + dstPath + " - updating store file list.");
// Append the new storefile into the list // Append the new storefile into the list
this.lock.writeLock().lock(); this.lock.writeLock().lock();
try { try {
@ -807,8 +821,7 @@ public class HStore implements Store {
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
} }
notifyChangedReadersObservers(); notifyChangedReadersObservers();
LOG.info("Successfully loaded store file " + srcPath LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
+ " into store " + this + " (new location: " + dstPath + ")");
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
String traceMessage = "BULK LOAD time,size,store size,store files [" String traceMessage = "BULK LOAD time,size,store size,store files ["
+ EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize

View File

@ -143,6 +143,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
@ -732,6 +733,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region.replayWALRegionEventMarker(regionEvent); region.replayWALRegionEventMarker(regionEvent);
continue; continue;
} }
BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
if (bulkLoadEvent != null) {
region.replayWALBulkLoadEventMarker(bulkLoadEvent);
continue;
}
} }
it.remove(); it.remove();
} }

View File

@ -247,7 +247,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* @param srcPathStr * @param srcPathStr
* @param sequenceId sequence Id associated with the HFile * @param sequenceId sequence Id associated with the HFile
*/ */
void bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException; Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
// General accessors into the state of the store // General accessors into the state of the store
// TODO abstract some of this out into a metrics class // TODO abstract some of this out into a metrics class
@ -440,4 +440,5 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/ */
void refreshStoreFiles(Collection<String> newFiles) throws IOException; void refreshStoreFiles(Collection<String> newFiles) throws IOException;
void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
} }

View File

@ -296,9 +296,6 @@ public class TestBulkLoad {
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName)); assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName)); assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
assertEquals(storeFileNames.size(), store.getStoreFileCount()); assertEquals(storeFileNames.size(), store.getStoreFileCount());
for (String storeFile : store.getStoreFileList()) {
assertTrue(storeFile.equals(storeFileNames.get(index++)));
}
} }
return true; return true;

View File

@ -28,33 +28,46 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.apache.hadoop.hbase.regionserver.TestHRegion.*; import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
@ -63,6 +76,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
@ -74,6 +88,7 @@ import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -1327,6 +1342,90 @@ public class TestHRegionReplayEvents {
region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
} }
/**
* Tests replaying region open markers from primary region. Checks whether the files are picked up
*/
@Test
public void testReplayBulkLoadEvent() throws IOException {
LOG.info("testReplayBulkLoadEvent starts");
putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
// close the region and open again.
primaryRegion.close();
primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
// bulk load a file into primary region
Random random = new Random();
byte[] randomValues = new byte[20];
random.nextBytes(randomValues);
Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
int expectedLoadFileCount = 0;
for (byte[] family : families) {
familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(testPath, family,
randomValues)));
expectedLoadFileCount++;
}
primaryRegion.bulkLoadHFiles(familyPaths, false);
// now replay the edits and the bulk load marker
reader = createWALReaderForPrimary();
LOG.info("-- Replaying edits and region events in secondary");
BulkLoadDescriptor bulkloadEvent = null;
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
if (bulkloadEvent != null) {
break;
}
}
// we should have 1 bulk load event
assertTrue(bulkloadEvent != null);
assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
// replay the bulk load event
secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
List<String> storeFileName = new ArrayList<String>();
for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
storeFileName.addAll(storeDesc.getStoreFileList());
}
// assert that the bulk loaded files are picked
for (Store s : secondaryRegion.getStores().values()) {
for (StoreFile sf : s.getStorefiles()) {
storeFileName.remove(sf.getPath().getName());
}
}
assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
LOG.info("-- Verifying edits from secondary");
for (byte[] family : families) {
assertGet(secondaryRegion, family, randomValues);
}
}
private String createHFileForFamilies(Path testPath, byte[] family,
byte[] valueBytes) throws IOException {
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
// TODO We need a way to do this without creating files
Path testFile = new Path(testPath, UUID.randomUUID().toString());
hFileFactory.withOutputStream(TEST_UTIL.getTestFileSystem().create(testFile));
hFileFactory.withFileContext(new HFileContext());
HFile.Writer writer = hFileFactory.create();
writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l,
KeyValue.Type.Put.getCode(), valueBytes)));
writer.close();
return testFile.toString();
}
/** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
* a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
* more rows but does not execute flush after * more rows but does not execute flush after