HBASE-24474 Rename LocalRegion to MasterRegion (#1811)
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
716702a349
commit
bad2d4e409
|
@ -133,6 +133,8 @@ import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
|||
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegion;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
|
||||
|
@ -144,7 +146,6 @@ import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStat
|
|||
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
||||
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
|
||||
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
|
||||
import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
|
||||
|
@ -450,7 +451,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private ProcedureStore procedureStore;
|
||||
|
||||
// the master local storage to store procedure data, etc.
|
||||
private LocalStore localStore;
|
||||
private MasterRegion masterRegion;
|
||||
|
||||
// handle table states
|
||||
private TableStateManager tableStateManager;
|
||||
|
@ -964,8 +965,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
this.splitWALManager = new SplitWALManager(this);
|
||||
}
|
||||
|
||||
// initialize local store
|
||||
localStore = LocalStore.create(this);
|
||||
// initialize master local region
|
||||
masterRegion = MasterRegionFactory.create(this);
|
||||
createProcedureExecutor();
|
||||
Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType =
|
||||
procedureExecutor.getActiveProceduresNoCopy().stream()
|
||||
|
@ -1543,8 +1544,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
stopProcedureExecutor();
|
||||
|
||||
if (localStore != null) {
|
||||
localStore.close(isAborted());
|
||||
if (masterRegion != null) {
|
||||
masterRegion.close(isAborted());
|
||||
}
|
||||
if (this.walManager != null) {
|
||||
this.walManager.stop();
|
||||
|
@ -1563,7 +1564,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private void createProcedureExecutor() throws IOException {
|
||||
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
|
||||
procedureStore =
|
||||
new RegionProcedureStore(this, localStore, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
|
||||
new RegionProcedureStore(this, masterRegion, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
|
||||
procedureStore.registerListener(new ProcedureStoreListener() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.util.StealJobQueue;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -161,7 +161,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
|
|||
protected boolean validate(Path file) {
|
||||
return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
|
||||
StoreFileInfo.validateStoreFileName(file.getName()) ||
|
||||
file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
|
||||
file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -89,7 +89,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
|
|||
protected boolean validate(Path file) {
|
||||
return AbstractFSWALProvider.validateWALFilename(file.getName()) ||
|
||||
MasterProcedureUtil.validateProcedureWALFilename(file.getName()) ||
|
||||
file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
|
||||
file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -42,7 +42,7 @@ public class TimeToLiveMasterLocalStoreHFileCleaner extends BaseTimeToLiveFileCl
|
|||
|
||||
@Override
|
||||
protected boolean valiateFilename(Path file) {
|
||||
return file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
|
||||
return file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -42,6 +42,6 @@ public class TimeToLiveMasterLocalStoreWALCleaner extends BaseTimeToLiveFileClea
|
|||
|
||||
@Override
|
||||
protected boolean valiateFilename(Path file) {
|
||||
return file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
|
||||
return file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
|
||||
|
||||
|
@ -54,7 +54,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
|
||||
|
||||
/**
|
||||
* A region that stores data in a separated directory.
|
||||
* A region that stores data in a separated directory, which can be used to store master local data.
|
||||
* <p/>
|
||||
* FileSystem layout:
|
||||
*
|
||||
|
@ -79,14 +79,14 @@ import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
|
|||
* Notice that, you can use different root file system and WAL file system. Then the above directory
|
||||
* will be on two file systems, the root file system will have the data directory while the WAL
|
||||
* filesystem will have the WALs directory. The archived HFile will be moved to the global HFile
|
||||
* archived directory with the {@link LocalRegionParams#archivedWalSuffix()} suffix. The archived
|
||||
* archived directory with the {@link MasterRegionParams#archivedWalSuffix()} suffix. The archived
|
||||
* WAL will be moved to the global WAL archived directory with the
|
||||
* {@link LocalRegionParams#archivedHFileSuffix()} suffix.
|
||||
* {@link MasterRegionParams#archivedHFileSuffix()} suffix.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class LocalRegion {
|
||||
public final class MasterRegion {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LocalRegion.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterRegion.class);
|
||||
|
||||
private static final String REPLAY_EDITS_DIR = "recovered.wals";
|
||||
|
||||
|
@ -100,12 +100,12 @@ public final class LocalRegion {
|
|||
final HRegion region;
|
||||
|
||||
@VisibleForTesting
|
||||
final LocalRegionFlusherAndCompactor flusherAndCompactor;
|
||||
final MasterRegionFlusherAndCompactor flusherAndCompactor;
|
||||
|
||||
private LocalRegionWALRoller walRoller;
|
||||
private MasterRegionWALRoller walRoller;
|
||||
|
||||
private LocalRegion(HRegion region, WALFactory walFactory,
|
||||
LocalRegionFlusherAndCompactor flusherAndCompactor, LocalRegionWALRoller walRoller) {
|
||||
private MasterRegion(HRegion region, WALFactory walFactory,
|
||||
MasterRegionFlusherAndCompactor flusherAndCompactor, MasterRegionWALRoller walRoller) {
|
||||
this.region = region;
|
||||
this.walFactory = walFactory;
|
||||
this.flusherAndCompactor = flusherAndCompactor;
|
||||
|
@ -128,7 +128,7 @@ public final class LocalRegion {
|
|||
}
|
||||
}
|
||||
|
||||
public void update(UpdateLocalRegion action) throws IOException {
|
||||
public void update(UpdateMasterRegion action) throws IOException {
|
||||
action.update(region);
|
||||
flusherAndCompactor.onUpdate();
|
||||
}
|
||||
|
@ -142,17 +142,17 @@ public final class LocalRegion {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
FlushResult flush(boolean force) throws IOException {
|
||||
public FlushResult flush(boolean force) throws IOException {
|
||||
return region.flush(force);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void requestRollAll() {
|
||||
public void requestRollAll() {
|
||||
walRoller.requestRollAll();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void waitUntilWalRollFinished() throws InterruptedException {
|
||||
public void waitUntilWalRollFinished() throws InterruptedException {
|
||||
walRoller.waitUntilWalRollFinished();
|
||||
}
|
||||
|
||||
|
@ -176,7 +176,7 @@ public final class LocalRegion {
|
|||
}
|
||||
}
|
||||
|
||||
private static WAL createWAL(WALFactory walFactory, LocalRegionWALRoller walRoller,
|
||||
private static WAL createWAL(WALFactory walFactory, MasterRegionWALRoller walRoller,
|
||||
String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo)
|
||||
throws IOException {
|
||||
String logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
|
||||
|
@ -197,7 +197,7 @@ public final class LocalRegion {
|
|||
|
||||
private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
|
||||
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
|
||||
LocalRegionWALRoller walRoller, String serverName) throws IOException {
|
||||
MasterRegionWALRoller walRoller, String serverName) throws IOException {
|
||||
TableName tn = td.getTableName();
|
||||
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
|
||||
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
|
||||
|
@ -215,7 +215,7 @@ public final class LocalRegion {
|
|||
}
|
||||
|
||||
private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
|
||||
FileSystem walFs, Path walRootDir, WALFactory walFactory, LocalRegionWALRoller walRoller,
|
||||
FileSystem walFs, Path walRootDir, WALFactory walFactory, MasterRegionWALRoller walRoller,
|
||||
String serverName) throws IOException {
|
||||
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
|
||||
Path regionDir =
|
||||
|
@ -269,7 +269,7 @@ public final class LocalRegion {
|
|||
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
|
||||
}
|
||||
|
||||
public static LocalRegion create(LocalRegionParams params) throws IOException {
|
||||
public static MasterRegion create(MasterRegionParams params) throws IOException {
|
||||
TableDescriptor td = params.tableDescriptor();
|
||||
LOG.info("Create or load local region for table " + td);
|
||||
Server server = params.server();
|
||||
|
@ -284,18 +284,21 @@ public final class LocalRegion {
|
|||
Configuration conf = new Configuration(baseConf);
|
||||
CommonFSUtils.setRootDir(conf, rootDir);
|
||||
CommonFSUtils.setWALRootDir(conf, walRootDir);
|
||||
LocalRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
|
||||
MasterRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
|
||||
params.flushIntervalMs());
|
||||
conf.setInt(AbstractFSWAL.MAX_LOGS, params.maxWals());
|
||||
if (params.useHsync() != null) {
|
||||
conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, params.useHsync());
|
||||
}
|
||||
if (params.useMetaCellComparator() != null) {
|
||||
conf.setBoolean(HRegion.USE_META_CELL_COMPARATOR, params.useMetaCellComparator());
|
||||
}
|
||||
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
|
||||
IntMath.ceilingPowerOfTwo(params.ringBufferSlotCount()));
|
||||
|
||||
LocalRegionWALRoller walRoller = LocalRegionWALRoller.create(td.getTableName() + "-WAL-Roller",
|
||||
conf, server, walFs, walRootDir, globalWALRootDir, params.archivedWalSuffix(),
|
||||
params.rollPeriodMs(), params.flushSize());
|
||||
MasterRegionWALRoller walRoller = MasterRegionWALRoller.create(
|
||||
td.getTableName() + "-WAL-Roller", conf, server, walFs, walRootDir, globalWALRootDir,
|
||||
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
|
||||
walRoller.start();
|
||||
|
||||
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), false);
|
||||
|
@ -311,7 +314,7 @@ public final class LocalRegion {
|
|||
server.getServerName().toString());
|
||||
}
|
||||
Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
|
||||
LocalRegionFlusherAndCompactor flusherAndCompactor = new LocalRegionFlusherAndCompactor(conf,
|
||||
MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf,
|
||||
server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
|
||||
params.compactMin(), globalArchiveDir, params.archivedHFileSuffix());
|
||||
walRoller.setFlusherAndCompactor(flusherAndCompactor);
|
||||
|
@ -320,6 +323,6 @@ public final class LocalRegion {
|
|||
LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
|
||||
" be created again when we actually archive the hfiles later, so continue", archiveDir);
|
||||
}
|
||||
return new LocalRegion(region, walFactory, flusherAndCompactor, walRoller);
|
||||
return new MasterRegion(region, walFactory, flusherAndCompactor, walRoller);
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -23,23 +23,16 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Used for storing data at master side. The data will be stored in a {@link LocalRegion}.
|
||||
* The factory class for creating a {@link MasterRegion}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class LocalStore {
|
||||
public final class MasterRegionFactory {
|
||||
|
||||
// Use the character $ to let the log cleaner know that this is not the normal wal file.
|
||||
public static final String ARCHIVED_WAL_SUFFIX = "$masterlocalwal$";
|
||||
|
@ -89,45 +82,8 @@ public final class LocalStore {
|
|||
private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
|
||||
|
||||
private final LocalRegion region;
|
||||
|
||||
private LocalStore(LocalRegion region) {
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
public void update(UpdateLocalRegion action) throws IOException {
|
||||
region.update(action);
|
||||
}
|
||||
|
||||
public Result get(Get get) throws IOException {
|
||||
return region.get(get);
|
||||
}
|
||||
|
||||
public RegionScanner getScanner(Scan scan) throws IOException {
|
||||
return region.getScanner(scan);
|
||||
}
|
||||
|
||||
public void close(boolean abort) {
|
||||
region.close(abort);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public FlushResult flush(boolean force) throws IOException {
|
||||
return region.flush(force);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void requestRollAll() {
|
||||
region.requestRollAll();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void waitUntilWalRollFinished() throws InterruptedException {
|
||||
region.waitUntilWalRollFinished();
|
||||
}
|
||||
|
||||
public static LocalStore create(Server server) throws IOException {
|
||||
LocalRegionParams params = new LocalRegionParams().server(server)
|
||||
public static MasterRegion create(Server server) throws IOException {
|
||||
MasterRegionParams params = new MasterRegionParams().server(server)
|
||||
.regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
|
||||
Configuration conf = server.getConfiguration();
|
||||
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
|
||||
|
@ -145,7 +101,6 @@ public final class LocalStore {
|
|||
long rollPeriodMs = conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS);
|
||||
params.rollPeriodMs(rollPeriodMs).archivedWalSuffix(ARCHIVED_WAL_SUFFIX)
|
||||
.archivedHFileSuffix(ARCHIVED_HFILE_SUFFIX);
|
||||
LocalRegion region = LocalRegion.create(params);
|
||||
return new LocalStore(region);
|
||||
return MasterRegion.create(params);
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -45,8 +45,8 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* As long as there is no RegionServerServices for a 'local' region, we need implement the flush and
|
||||
* compaction logic by our own.
|
||||
* As long as there is no RegionServerServices for a master local region, we need implement the
|
||||
* flush and compaction logic by our own.
|
||||
* <p/>
|
||||
* The flush logic is very simple, every time after calling a modification method in
|
||||
* {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
|
||||
|
@ -57,9 +57,9 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
|
|||
* count, if it is above the compactMin, we will do a major compaction.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class LocalRegionFlusherAndCompactor implements Closeable {
|
||||
class MasterRegionFlusherAndCompactor implements Closeable {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionFlusherAndCompactor.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class);
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
|
@ -101,7 +101,7 @@ class LocalRegionFlusherAndCompactor implements Closeable {
|
|||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
LocalRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
|
||||
MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
|
||||
long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
|
||||
Path globalArchivePath, String archivedHFileSuffix) {
|
||||
this.conf = conf;
|
||||
|
@ -142,7 +142,7 @@ class LocalRegionFlusherAndCompactor implements Closeable {
|
|||
Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
|
||||
globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
|
||||
try {
|
||||
LocalRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
|
||||
MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
|
||||
archivedHFileSuffix);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
|
||||
|
@ -156,7 +156,7 @@ class LocalRegionFlusherAndCompactor implements Closeable {
|
|||
region.compact(true);
|
||||
moveHFileToGlobalArchiveDir();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to compact procedure store region", e);
|
||||
LOG.error("Failed to compact master local region", e);
|
||||
}
|
||||
compactLock.lock();
|
||||
try {
|
||||
|
@ -207,8 +207,8 @@ class LocalRegionFlusherAndCompactor implements Closeable {
|
|||
region.flush(true);
|
||||
lastFlushTime = EnvironmentEdgeManager.currentTime();
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
|
||||
abortable.abort("Failed to flush procedure store region", e);
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
|
||||
abortable.abort("Failed to flush master local region", e);
|
||||
return;
|
||||
}
|
||||
compactLock.lock();
|
|
@ -15,17 +15,17 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The parameters for constructing {@link LocalRegion}.
|
||||
* The parameters for constructing {@link MasterRegion}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class LocalRegionParams {
|
||||
public class MasterRegionParams {
|
||||
|
||||
private Server server;
|
||||
|
||||
|
@ -53,71 +53,78 @@ public class LocalRegionParams {
|
|||
|
||||
private String archivedHFileSuffix;
|
||||
|
||||
public LocalRegionParams server(Server server) {
|
||||
private Boolean useMetaCellComparator;
|
||||
|
||||
public MasterRegionParams server(Server server) {
|
||||
this.server = server;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams regionDirName(String regionDirName) {
|
||||
public MasterRegionParams regionDirName(String regionDirName) {
|
||||
this.regionDirName = regionDirName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams tableDescriptor(TableDescriptor tableDescriptor) {
|
||||
public MasterRegionParams tableDescriptor(TableDescriptor tableDescriptor) {
|
||||
this.tableDescriptor = tableDescriptor;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams flushSize(long flushSize) {
|
||||
public MasterRegionParams flushSize(long flushSize) {
|
||||
this.flushSize = flushSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams flushPerChanges(long flushPerChanges) {
|
||||
public MasterRegionParams flushPerChanges(long flushPerChanges) {
|
||||
this.flushPerChanges = flushPerChanges;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams flushIntervalMs(long flushIntervalMs) {
|
||||
public MasterRegionParams flushIntervalMs(long flushIntervalMs) {
|
||||
this.flushIntervalMs = flushIntervalMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams compactMin(int compactMin) {
|
||||
public MasterRegionParams compactMin(int compactMin) {
|
||||
this.compactMin = compactMin;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams maxWals(int maxWals) {
|
||||
public MasterRegionParams maxWals(int maxWals) {
|
||||
this.maxWals = maxWals;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams useHsync(boolean useHsync) {
|
||||
public MasterRegionParams useHsync(boolean useHsync) {
|
||||
this.useHsync = useHsync;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams ringBufferSlotCount(int ringBufferSlotCount) {
|
||||
public MasterRegionParams ringBufferSlotCount(int ringBufferSlotCount) {
|
||||
this.ringBufferSlotCount = ringBufferSlotCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams rollPeriodMs(long rollPeriodMs) {
|
||||
public MasterRegionParams rollPeriodMs(long rollPeriodMs) {
|
||||
this.rollPeriodMs = rollPeriodMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams archivedWalSuffix(String archivedWalSuffix) {
|
||||
public MasterRegionParams archivedWalSuffix(String archivedWalSuffix) {
|
||||
this.archivedWalSuffix = archivedWalSuffix;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalRegionParams archivedHFileSuffix(String archivedHFileSuffix) {
|
||||
public MasterRegionParams archivedHFileSuffix(String archivedHFileSuffix) {
|
||||
this.archivedHFileSuffix = archivedHFileSuffix;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MasterRegionParams useMetaCellComparator(boolean useMetaCellComparator) {
|
||||
this.useMetaCellComparator = useMetaCellComparator;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Server server() {
|
||||
return server;
|
||||
}
|
||||
|
@ -169,4 +176,8 @@ public class LocalRegionParams {
|
|||
public String archivedHFileSuffix() {
|
||||
return archivedHFileSuffix;
|
||||
}
|
||||
|
||||
public Boolean useMetaCellComparator() {
|
||||
return useMetaCellComparator;
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -26,11 +26,11 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
final class LocalRegionUtils {
|
||||
final class MasterRegionUtils {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionUtils.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterRegionUtils.class);
|
||||
|
||||
private LocalRegionUtils() {
|
||||
private MasterRegionUtils() {
|
||||
}
|
||||
|
||||
static void moveFilesUnderDir(FileSystem fs, Path src, Path dst, String suffix)
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
|
||||
|
||||
|
@ -35,19 +35,19 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* As long as there is no RegionServerServices for a local region, we need implement log roller
|
||||
* logic by our own.
|
||||
* As long as there is no RegionServerServices for a master local region, we need implement log
|
||||
* roller logic by our own.
|
||||
* <p/>
|
||||
* We can reuse most of the code for normal wal roller, the only difference is that there is only
|
||||
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
|
||||
* procedure store region.
|
||||
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master
|
||||
* local region.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
|
||||
public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionWALRoller.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterRegionWALRoller.class);
|
||||
|
||||
private volatile LocalRegionFlusherAndCompactor flusherAndCompactor;
|
||||
private volatile MasterRegionFlusherAndCompactor flusherAndCompactor;
|
||||
|
||||
private final FileSystem fs;
|
||||
|
||||
|
@ -57,7 +57,7 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
|
|||
|
||||
private final String archivedWALSuffix;
|
||||
|
||||
private LocalRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
|
||||
private MasterRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
|
||||
Path walRootDir, Path globalWALRootDir, String archivedWALSuffix) {
|
||||
super(name, conf, abortable);
|
||||
this.fs = fs;
|
||||
|
@ -70,7 +70,8 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
|
|||
protected void afterRoll(WAL wal) {
|
||||
// move the archived WAL files to the global archive path
|
||||
try {
|
||||
LocalRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir, archivedWALSuffix);
|
||||
MasterRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir,
|
||||
archivedWALSuffix);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
|
||||
globalWALArchiveDir, e);
|
||||
|
@ -79,17 +80,17 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
|
|||
|
||||
@Override
|
||||
protected void scheduleFlush(String encodedRegionName) {
|
||||
LocalRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
|
||||
MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
|
||||
if (flusher != null) {
|
||||
flusher.requestFlush();
|
||||
}
|
||||
}
|
||||
|
||||
void setFlusherAndCompactor(LocalRegionFlusherAndCompactor flusherAndCompactor) {
|
||||
void setFlusherAndCompactor(MasterRegionFlusherAndCompactor flusherAndCompactor) {
|
||||
this.flusherAndCompactor = flusherAndCompactor;
|
||||
}
|
||||
|
||||
static LocalRegionWALRoller create(String name, Configuration conf, Abortable abortable,
|
||||
static MasterRegionWALRoller create(String name, Configuration conf, Abortable abortable,
|
||||
FileSystem fs, Path walRootDir, Path globalWALRootDir, String archivedWALSuffix,
|
||||
long rollPeriodMs, long flushSize) {
|
||||
// we can not run with wal disabled, so force set it to true.
|
||||
|
@ -100,7 +101,7 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
|
|||
// make the roll size the same with the flush size, as we only have one region here
|
||||
conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
|
||||
conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
|
||||
return new LocalRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
|
||||
return new MasterRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
|
||||
archivedWALSuffix);
|
||||
}
|
||||
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
@FunctionalInterface
|
||||
public interface UpdateLocalRegion {
|
||||
public interface UpdateMasterRegion {
|
||||
|
||||
void update(HRegion region) throws IOException;
|
||||
}
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
|
@ -84,8 +84,8 @@ public class HFileProcedurePrettyPrinter extends AbstractHBaseTool {
|
|||
}
|
||||
|
||||
private void addAllHFiles() throws IOException {
|
||||
Path masterProcDir = new Path(CommonFSUtils.getRootDir(conf), LocalStore.MASTER_STORE_DIR);
|
||||
Path tableDir = CommonFSUtils.getTableDir(masterProcDir, LocalStore.TABLE_NAME);
|
||||
Path masterProcDir = new Path(CommonFSUtils.getRootDir(conf), MasterRegionFactory.MASTER_STORE_DIR);
|
||||
Path tableDir = CommonFSUtils.getTableDir(masterProcDir, MasterRegionFactory.TABLE_NAME);
|
||||
FileSystem fs = tableDir.getFileSystem(conf);
|
||||
Path regionDir =
|
||||
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.procedure2.store.region;
|
|||
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
|
||||
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
|
||||
import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
|
||||
import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
|
@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
|
|||
import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegion;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
|
||||
|
@ -88,13 +88,13 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
private final LeaseRecovery leaseRecovery;
|
||||
|
||||
@VisibleForTesting
|
||||
final LocalStore localStore;
|
||||
final MasterRegion region;
|
||||
|
||||
private int numThreads;
|
||||
|
||||
public RegionProcedureStore(Server server, LocalStore localStore, LeaseRecovery leaseRecovery) {
|
||||
public RegionProcedureStore(Server server, MasterRegion region, LeaseRecovery leaseRecovery) {
|
||||
this.server = server;
|
||||
this.localStore = localStore;
|
||||
this.region = region;
|
||||
this.leaseRecovery = leaseRecovery;
|
||||
}
|
||||
|
||||
|
@ -236,7 +236,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
|
||||
if (maxProcIdSet.longValue() > 0) {
|
||||
// let's add a fake row to retain the max proc id
|
||||
localStore.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
|
||||
region.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
|
||||
.addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
|
||||
}
|
||||
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
|
||||
|
@ -263,7 +263,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
long maxProcId = 0;
|
||||
|
||||
try (RegionScanner scanner =
|
||||
localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
|
||||
region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
boolean moreRows;
|
||||
do {
|
||||
|
@ -333,7 +333,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
for (Procedure<?> subProc : subProcs) {
|
||||
serializePut(subProc, mutations, rowsToLock);
|
||||
}
|
||||
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
|
||||
Arrays.toString(subProcs), e);
|
||||
|
@ -351,7 +351,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
for (Procedure<?> proc : procs) {
|
||||
serializePut(proc, mutations, rowsToLock);
|
||||
}
|
||||
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
|
||||
throw new UncheckedIOException(e);
|
||||
|
@ -364,7 +364,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
runWithoutRpcCall(() -> {
|
||||
try {
|
||||
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
|
||||
localStore.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
|
||||
region.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
|
||||
PROC_QUALIFIER, proto.toByteArray())));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
|
||||
|
@ -376,7 +376,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
@Override
|
||||
public void delete(long procId) {
|
||||
try {
|
||||
localStore.update(r -> r.put(
|
||||
region.update(r -> r.put(
|
||||
new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
|
||||
|
@ -393,7 +393,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
for (long subProcId : subProcIds) {
|
||||
serializeDelete(subProcId, mutations, rowsToLock);
|
||||
}
|
||||
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
|
||||
Arrays.toString(subProcIds), e);
|
||||
|
@ -417,7 +417,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
serializeDelete(procId, mutations, rowsToLock);
|
||||
}
|
||||
try {
|
||||
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
|
||||
throw new UncheckedIOException(e);
|
||||
|
@ -429,7 +429,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
// actually delete the procedures if it is not the one with the max procedure id.
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
try (RegionScanner scanner =
|
||||
localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
|
||||
region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
|
||||
// skip the row with max procedure id
|
||||
boolean moreRows = scanner.next(cells);
|
||||
if (cells.isEmpty()) {
|
||||
|
@ -444,7 +444,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
Cell cell = cells.get(0);
|
||||
cells.clear();
|
||||
if (cell.getValueLength() == 0) {
|
||||
localStore.update(r -> r
|
||||
region.update(r -> r
|
||||
.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.procedure2.store.region;
|
||||
|
||||
import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
|
||||
import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.time.Instant;
|
||||
|
|
|
@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
|
@ -258,6 +259,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public static final String SPECIAL_RECOVERED_EDITS_DIR =
|
||||
"hbase.hregion.special.recovered.edits.dir";
|
||||
|
||||
/**
|
||||
* Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
|
||||
* master local region.
|
||||
*/
|
||||
public static final String USE_META_CELL_COMPARATOR = "hbase.region.use.meta.cell.comparator";
|
||||
|
||||
public static final boolean DEFAULT_USE_META_CELL_COMPARATOR = false;
|
||||
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
/* Closing can take some time; use the closing flag if there is stuff we don't
|
||||
|
@ -418,6 +427,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Used for testing.
|
||||
private volatile Long timeoutForWriteLock = null;
|
||||
|
||||
private final CellComparator cellComparator;
|
||||
|
||||
/**
|
||||
* @return The smallest mvcc readPoint across all the scanners in this
|
||||
* region. Writes older than this readPoint, are included in every
|
||||
|
@ -771,9 +782,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
|
||||
this.baseConf = confParam;
|
||||
this.conf = new CompoundConfiguration()
|
||||
.add(confParam)
|
||||
.addBytesMap(htd.getValues());
|
||||
this.conf = new CompoundConfiguration().add(confParam).addBytesMap(htd.getValues());
|
||||
this.cellComparator = htd.isMetaTable() ||
|
||||
conf.getBoolean(USE_META_CELL_COMPARATOR, DEFAULT_USE_META_CELL_COMPARATOR) ?
|
||||
CellComparatorImpl.META_COMPARATOR :
|
||||
CellComparatorImpl.COMPARATOR;
|
||||
this.lock = new ReentrantReadWriteLock(conf.getBoolean(FAIR_REENTRANT_CLOSE_LOCK,
|
||||
DEFAULT_FAIR_REENTRANT_CLOSE_LOCK));
|
||||
this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
|
||||
|
@ -8873,8 +8886,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public CellComparator getCellComparator() {
|
||||
return this.getRegionInfo().isMetaRegion() ? CellComparatorImpl.META_COMPARATOR
|
||||
: CellComparatorImpl.COMPARATOR;
|
||||
return cellComparator;
|
||||
}
|
||||
|
||||
public long getMemStoreFlushSize() {
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
public class LocalRegionTestBase {
|
||||
public class MasterRegionTestBase {
|
||||
|
||||
protected HBaseCommonTestingUtility htu;
|
||||
|
||||
protected LocalRegion region;
|
||||
protected MasterRegion region;
|
||||
|
||||
protected ChoreService choreService;
|
||||
|
||||
|
@ -65,7 +65,7 @@ public class LocalRegionTestBase {
|
|||
protected void configure(Configuration conf) throws IOException {
|
||||
}
|
||||
|
||||
protected void configure(LocalRegionParams params) {
|
||||
protected void configure(MasterRegionParams params) {
|
||||
}
|
||||
|
||||
protected void postSetUp() throws IOException {
|
||||
|
@ -87,15 +87,15 @@ public class LocalRegionTestBase {
|
|||
when(server.getChoreService()).thenReturn(choreService);
|
||||
Path testDir = htu.getDataTestDir();
|
||||
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
|
||||
LocalRegionParams params = new LocalRegionParams();
|
||||
MasterRegionParams params = new MasterRegionParams();
|
||||
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
|
||||
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
|
||||
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
|
||||
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
|
||||
.archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
|
||||
.archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
|
||||
.archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
|
||||
.archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
|
||||
configure(params);
|
||||
region = LocalRegion.create(params);
|
||||
region = MasterRegion.create(params);
|
||||
postSetUp();
|
||||
}
|
||||
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -41,18 +41,18 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestLocalRegionCompaction extends LocalRegionTestBase {
|
||||
public class TestMasterRegionCompaction extends MasterRegionTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestLocalRegionCompaction.class);
|
||||
HBaseClassTestRule.forClass(TestMasterRegionCompaction.class);
|
||||
|
||||
private int compactMin = 4;
|
||||
|
||||
private HFileCleaner hfileCleaner;
|
||||
|
||||
@Override
|
||||
protected void configure(LocalRegionParams params) {
|
||||
protected void configure(MasterRegionParams params) {
|
||||
params.compactMin(compactMin);
|
||||
}
|
||||
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -47,17 +47,17 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestLocalRegionFlush {
|
||||
public class TestMasterRegionFlush {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestLocalRegionFlush.class);
|
||||
HBaseClassTestRule.forClass(TestMasterRegionFlush.class);
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
private HRegion region;
|
||||
|
||||
private LocalRegionFlusherAndCompactor flusher;
|
||||
private MasterRegionFlusherAndCompactor flusher;
|
||||
|
||||
private AtomicInteger flushCalled;
|
||||
|
||||
|
@ -97,7 +97,7 @@ public class TestLocalRegionFlush {
|
|||
}
|
||||
|
||||
private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
|
||||
flusher = new LocalRegionFlusherAndCompactor(conf, new Abortable() {
|
||||
flusher = new MasterRegionFlusherAndCompactor(conf, new Abortable() {
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -69,12 +69,12 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestLocalRegionOnTwoFileSystems {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestLocalRegionOnTwoFileSystems.class);
|
||||
public class TestMasterRegionOnTwoFileSystems {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestLocalRegionOnTwoFileSystems.class);
|
||||
HBaseClassTestRule.forClass(TestMasterRegionOnTwoFileSystems.class);
|
||||
|
||||
private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility();
|
||||
|
||||
|
@ -90,7 +90,7 @@ public class TestLocalRegionOnTwoFileSystems {
|
|||
|
||||
private static int COMPACT_MIN = 4;
|
||||
|
||||
private LocalRegion region;
|
||||
private MasterRegion region;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
|
@ -113,18 +113,18 @@ public class TestLocalRegionOnTwoFileSystems {
|
|||
HFILE_UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
private LocalRegion createLocalRegion(ServerName serverName) throws IOException {
|
||||
private MasterRegion createMasterRegion(ServerName serverName) throws IOException {
|
||||
Server server = mock(Server.class);
|
||||
when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
|
||||
when(server.getServerName()).thenReturn(serverName);
|
||||
LocalRegionParams params = new LocalRegionParams();
|
||||
MasterRegionParams params = new MasterRegionParams();
|
||||
params.server(server).regionDirName("local").tableDescriptor(TD)
|
||||
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
|
||||
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
|
||||
.useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
|
||||
.archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
|
||||
.archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
|
||||
return LocalRegion.create(params);
|
||||
.archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
|
||||
.archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
|
||||
return MasterRegion.create(params);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -135,7 +135,7 @@ public class TestLocalRegionOnTwoFileSystems {
|
|||
Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
|
||||
FileSystem walFs = WAL_UTIL.getTestFileSystem();
|
||||
walFs.delete(walRootDir, true);
|
||||
region = createLocalRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
|
||||
region = createMasterRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -174,9 +174,8 @@ public class TestLocalRegionOnTwoFileSystems {
|
|||
return false;
|
||||
}
|
||||
});
|
||||
LOG.info("hfile archive content {}",
|
||||
Arrays.stream(rootFs.listStatus(storeArchiveDir)).map(f -> f.getPath().toString()).
|
||||
collect(Collectors.joining(",")));
|
||||
LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir))
|
||||
.map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
|
||||
|
||||
// make sure the archived wal files are on the wal fs
|
||||
Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
|
||||
|
@ -219,7 +218,7 @@ public class TestLocalRegionOnTwoFileSystems {
|
|||
region.update(r -> r.put(put));
|
||||
}
|
||||
region.close(true);
|
||||
region = createLocalRegion(
|
||||
region = createMasterRegion(
|
||||
ServerName.valueOf("localhost", 12345, System.currentTimeMillis() + round + 1));
|
||||
try (RegionScanner scanner = region.getScanner(new Scan())) {
|
||||
List<Cell> cells = new ArrayList<>();
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.store;
|
||||
package org.apache.hadoop.hbase.master.region;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -40,11 +40,11 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestLocalRegionWALCleaner extends LocalRegionTestBase {
|
||||
public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestLocalRegionWALCleaner.class);
|
||||
HBaseClassTestRule.forClass(TestMasterRegionWALCleaner.class);
|
||||
|
||||
private static long TTL_MS = 5000;
|
||||
|
|
@ -29,7 +29,8 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegion;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
|
||||
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||
|
@ -113,7 +114,7 @@ public class RegionProcedureStorePerformanceEvaluation
|
|||
}
|
||||
}
|
||||
|
||||
private LocalStore localStore;
|
||||
private MasterRegion region;
|
||||
|
||||
@Override
|
||||
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
|
||||
|
@ -127,11 +128,11 @@ public class RegionProcedureStorePerformanceEvaluation
|
|||
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
|
||||
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
|
||||
initialCountPercentage, null);
|
||||
conf.setBoolean(LocalStore.USE_HSYNC_KEY, "hsync".equals(syncType));
|
||||
conf.setBoolean(MasterRegionFactory.USE_HSYNC_KEY, "hsync".equals(syncType));
|
||||
CommonFSUtils.setRootDir(conf, storeDir);
|
||||
MockServer server = new MockServer(conf);
|
||||
localStore = LocalStore.create(server);
|
||||
return new RegionProcedureStore(server, localStore, (fs, apth) -> {
|
||||
region = MasterRegionFactory.create(server);
|
||||
return new RegionProcedureStore(server, region, (fs, apth) -> {
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -148,7 +149,7 @@ public class RegionProcedureStorePerformanceEvaluation
|
|||
|
||||
@Override
|
||||
protected void postStop(RegionProcedureStore store) throws IOException {
|
||||
localStore.close(true);
|
||||
region.close(true);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
|
|
|
@ -22,7 +22,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegion;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
@ -37,7 +38,7 @@ public class RegionProcedureStoreTestBase {
|
|||
|
||||
protected HBaseCommonTestingUtility htu;
|
||||
|
||||
protected LocalStore localStore;
|
||||
protected MasterRegion region;
|
||||
|
||||
protected RegionProcedureStore store;
|
||||
|
||||
|
@ -51,14 +52,14 @@ public class RegionProcedureStoreTestBase {
|
|||
Path testDir = htu.getDataTestDir();
|
||||
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
|
||||
Server server = RegionProcedureStoreTestHelper.mockServer(conf);
|
||||
localStore = LocalStore.create(server);
|
||||
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
|
||||
region = MasterRegionFactory.create(server);
|
||||
store = RegionProcedureStoreTestHelper.createStore(server, region, new LoadCounter());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
store.stop(true);
|
||||
localStore.close(true);
|
||||
region.close(true);
|
||||
htu.cleanupTestDir();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegion;
|
||||
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
|
||||
|
||||
|
@ -43,9 +43,9 @@ final class RegionProcedureStoreTestHelper {
|
|||
return server;
|
||||
}
|
||||
|
||||
static RegionProcedureStore createStore(Server server, LocalStore localStore,
|
||||
static RegionProcedureStore createStore(Server server, MasterRegion region,
|
||||
ProcedureLoader loader) throws IOException {
|
||||
RegionProcedureStore store = new RegionProcedureStore(server, localStore, new LeaseRecovery() {
|
||||
RegionProcedureStore store = new RegionProcedureStore(server, region, new LeaseRecovery() {
|
||||
|
||||
@Override
|
||||
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -94,15 +94,15 @@ public class TestHFileProcedurePrettyPrinter extends RegionProcedureStoreTestBas
|
|||
store.insert(proc, null);
|
||||
procs.add(proc);
|
||||
}
|
||||
store.localStore.flush(true);
|
||||
store.region.flush(true);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
store.delete(procs.get(i).getProcId());
|
||||
}
|
||||
store.localStore.flush(true);
|
||||
store.region.flush(true);
|
||||
store.cleanup();
|
||||
store.localStore.flush(true);
|
||||
store.region.flush(true);
|
||||
Path tableDir = CommonFSUtils.getTableDir(
|
||||
new Path(htu.getDataTestDir(), LocalStore.MASTER_STORE_DIR), LocalStore.TABLE_NAME);
|
||||
new Path(htu.getDataTestDir(), MasterRegionFactory.MASTER_STORE_DIR), MasterRegionFactory.TABLE_NAME);
|
||||
FileSystem fs = tableDir.getFileSystem(htu.getConfiguration());
|
||||
Path regionDir =
|
||||
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
|
||||
|
|
|
@ -126,24 +126,24 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
|
|||
assertEquals(1, loader.getRunnableCount());
|
||||
|
||||
// the row should still be there
|
||||
assertTrue(store.localStore
|
||||
assertTrue(store.region
|
||||
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
|
||||
assertTrue(store.localStore
|
||||
assertTrue(store.region
|
||||
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
|
||||
|
||||
// proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
|
||||
// id
|
||||
store.cleanup();
|
||||
assertTrue(store.localStore
|
||||
assertTrue(store.region
|
||||
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
|
||||
assertFalse(store.localStore
|
||||
assertFalse(store.region
|
||||
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
|
||||
|
||||
RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
|
||||
store.insert(proc4, null);
|
||||
store.cleanup();
|
||||
// proc3 should also be deleted as now proc4 holds the max proc id
|
||||
assertFalse(store.localStore
|
||||
assertFalse(store.region
|
||||
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,8 @@ import org.apache.hadoop.hbase.Server;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegion;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
||||
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||
|
@ -67,7 +68,7 @@ public class TestRegionProcedureStoreMigration {
|
|||
|
||||
private Server server;
|
||||
|
||||
private LocalStore localStore;
|
||||
private MasterRegion region;
|
||||
|
||||
private RegionProcedureStore store;
|
||||
|
||||
|
@ -92,7 +93,7 @@ public class TestRegionProcedureStoreMigration {
|
|||
walStore.recoverLease();
|
||||
walStore.load(new LoadCounter());
|
||||
server = RegionProcedureStoreTestHelper.mockServer(conf);
|
||||
localStore = LocalStore.create(server);
|
||||
region = MasterRegionFactory.create(server);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -100,7 +101,7 @@ public class TestRegionProcedureStoreMigration {
|
|||
if (store != null) {
|
||||
store.stop(true);
|
||||
}
|
||||
localStore.close(true);
|
||||
region.close(true);
|
||||
walStore.stop(true);
|
||||
htu.cleanupTestDir();
|
||||
}
|
||||
|
@ -120,7 +121,7 @@ public class TestRegionProcedureStoreMigration {
|
|||
SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
|
||||
new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
|
||||
MutableLong maxProcIdSet = new MutableLong(0);
|
||||
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new ProcedureLoader() {
|
||||
store = RegionProcedureStoreTestHelper.createStore(server, region, new ProcedureLoader() {
|
||||
|
||||
@Override
|
||||
public void setMaxProcId(long maxProcId) {
|
||||
|
@ -166,7 +167,7 @@ public class TestRegionProcedureStoreMigration {
|
|||
walStore.stop(true);
|
||||
|
||||
try {
|
||||
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
|
||||
store = RegionProcedureStoreTestHelper.createStore(server, region, new LoadCounter());
|
||||
fail("Should fail since AssignProcedure is not supported");
|
||||
} catch (HBaseIOException e) {
|
||||
assertThat(e.getMessage(), startsWith("Unsupported"));
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.master.store.LocalStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -58,18 +58,18 @@ public class TestWALProcedurePrettyPrinter extends RegionProcedureStoreTestBase
|
|||
store.insert(proc, null);
|
||||
procs.add(proc);
|
||||
}
|
||||
store.localStore.flush(true);
|
||||
store.region.flush(true);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
store.delete(procs.get(i).getProcId());
|
||||
}
|
||||
store.cleanup();
|
||||
Path walParentDir = new Path(htu.getDataTestDir(),
|
||||
LocalStore.MASTER_STORE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
|
||||
MasterRegionFactory.MASTER_STORE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
|
||||
FileSystem fs = walParentDir.getFileSystem(htu.getConfiguration());
|
||||
Path walDir = fs.listStatus(walParentDir)[0].getPath();
|
||||
Path walFile = fs.listStatus(walDir)[0].getPath();
|
||||
store.localStore.requestRollAll();
|
||||
store.localStore.waitUntilWalRollFinished();
|
||||
store.region.requestRollAll();
|
||||
store.region.waitUntilWalRollFinished();
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
PrintStream out = new PrintStream(bos);
|
||||
WALProcedurePrettyPrinter printer = new WALProcedurePrettyPrinter(out);
|
||||
|
|
Loading…
Reference in New Issue