HBASE-24474 Rename LocalRegion to MasterRegion (#1811)

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2020-06-02 10:37:43 +08:00
parent 69999371c6
commit 6b43015fc9
28 changed files with 218 additions and 230 deletions

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Service;
import java.io.IOException;
@ -136,6 +137,8 @@ import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
@ -145,7 +148,6 @@ import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.mob.MobConstants;
@ -217,10 +219,12 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.webapp.WebAppContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@ -435,7 +439,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private ProcedureStore procedureStore;
// the master local storage to store procedure data, etc.
private LocalStore localStore;
private MasterRegion masterRegion;
// handle table states
private TableStateManager tableStateManager;
@ -935,8 +939,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.splitWALManager = new SplitWALManager(this);
}
// initialize local store
localStore = LocalStore.create(this);
// initialize master local region
masterRegion = MasterRegionFactory.create(this);
createProcedureExecutor();
Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType =
procedureExecutor.getActiveProceduresNoCopy().stream()
@ -1510,8 +1514,8 @@ public class HMaster extends HRegionServer implements MasterServices {
stopProcedureExecutor();
if (localStore != null) {
localStore.close(isAborted());
if (masterRegion != null) {
masterRegion.close(isAborted());
}
if (this.walManager != null) {
this.walManager.stop();
@ -1530,7 +1534,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private void createProcedureExecutor() throws IOException {
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
procedureStore =
new RegionProcedureStore(this, localStore, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
new RegionProcedureStore(this, masterRegion, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
procedureStore.registerListener(new ProcedureStoreListener() {
@Override

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.yetus.audience.InterfaceAudience;
@ -161,7 +161,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
protected boolean validate(Path file) {
return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
StoreFileInfo.validateStoreFileName(file.getName()) ||
file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
}
/**

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -89,7 +89,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
protected boolean validate(Path file) {
return AbstractFSWALProvider.validateWALFilename(file.getName()) ||
MasterProcedureUtil.validateProcedureWALFilename(file.getName()) ||
file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
}
@Override

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -42,7 +42,7 @@ public class TimeToLiveMasterLocalStoreHFileCleaner extends BaseTimeToLiveFileCl
@Override
protected boolean valiateFilename(Path file) {
return file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
return file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
}
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -42,6 +42,6 @@ public class TimeToLiveMasterLocalStoreWALCleaner extends BaseTimeToLiveFileClea
@Override
protected boolean valiateFilename(Path file) {
return file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
return file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
@ -54,7 +54,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
/**
* A region that stores data in a separated directory.
* A region that stores data in a separated directory, which can be used to store master local data.
* <p/>
* FileSystem layout:
*
@ -79,14 +79,14 @@ import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
* Notice that, you can use different root file system and WAL file system. Then the above directory
* will be on two file systems, the root file system will have the data directory while the WAL
* filesystem will have the WALs directory. The archived HFile will be moved to the global HFile
* archived directory with the {@link LocalRegionParams#archivedWalSuffix()} suffix. The archived
* archived directory with the {@link MasterRegionParams#archivedWalSuffix()} suffix. The archived
* WAL will be moved to the global WAL archived directory with the
* {@link LocalRegionParams#archivedHFileSuffix()} suffix.
* {@link MasterRegionParams#archivedHFileSuffix()} suffix.
*/
@InterfaceAudience.Private
public final class LocalRegion {
public final class MasterRegion {
private static final Logger LOG = LoggerFactory.getLogger(LocalRegion.class);
private static final Logger LOG = LoggerFactory.getLogger(MasterRegion.class);
private static final String REPLAY_EDITS_DIR = "recovered.wals";
@ -100,12 +100,12 @@ public final class LocalRegion {
final HRegion region;
@VisibleForTesting
final LocalRegionFlusherAndCompactor flusherAndCompactor;
final MasterRegionFlusherAndCompactor flusherAndCompactor;
private LocalRegionWALRoller walRoller;
private MasterRegionWALRoller walRoller;
private LocalRegion(HRegion region, WALFactory walFactory,
LocalRegionFlusherAndCompactor flusherAndCompactor, LocalRegionWALRoller walRoller) {
private MasterRegion(HRegion region, WALFactory walFactory,
MasterRegionFlusherAndCompactor flusherAndCompactor, MasterRegionWALRoller walRoller) {
this.region = region;
this.walFactory = walFactory;
this.flusherAndCompactor = flusherAndCompactor;
@ -128,7 +128,7 @@ public final class LocalRegion {
}
}
public void update(UpdateLocalRegion action) throws IOException {
public void update(UpdateMasterRegion action) throws IOException {
action.update(region);
flusherAndCompactor.onUpdate();
}
@ -142,17 +142,17 @@ public final class LocalRegion {
}
@VisibleForTesting
FlushResult flush(boolean force) throws IOException {
public FlushResult flush(boolean force) throws IOException {
return region.flush(force);
}
@VisibleForTesting
void requestRollAll() {
public void requestRollAll() {
walRoller.requestRollAll();
}
@VisibleForTesting
void waitUntilWalRollFinished() throws InterruptedException {
public void waitUntilWalRollFinished() throws InterruptedException {
walRoller.waitUntilWalRollFinished();
}
@ -176,7 +176,7 @@ public final class LocalRegion {
}
}
private static WAL createWAL(WALFactory walFactory, LocalRegionWALRoller walRoller,
private static WAL createWAL(WALFactory walFactory, MasterRegionWALRoller walRoller,
String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo)
throws IOException {
String logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
@ -197,7 +197,7 @@ public final class LocalRegion {
private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
LocalRegionWALRoller walRoller, String serverName) throws IOException {
MasterRegionWALRoller walRoller, String serverName) throws IOException {
TableName tn = td.getTableName();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
@ -215,7 +215,7 @@ public final class LocalRegion {
}
private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
FileSystem walFs, Path walRootDir, WALFactory walFactory, LocalRegionWALRoller walRoller,
FileSystem walFs, Path walRootDir, WALFactory walFactory, MasterRegionWALRoller walRoller,
String serverName) throws IOException {
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path regionDir =
@ -269,7 +269,7 @@ public final class LocalRegion {
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}
public static LocalRegion create(LocalRegionParams params) throws IOException {
public static MasterRegion create(MasterRegionParams params) throws IOException {
TableDescriptor td = params.tableDescriptor();
LOG.info("Create or load local region for table " + td);
Server server = params.server();
@ -284,18 +284,21 @@ public final class LocalRegion {
Configuration conf = new Configuration(baseConf);
CommonFSUtils.setRootDir(conf, rootDir);
CommonFSUtils.setWALRootDir(conf, walRootDir);
LocalRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
MasterRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
params.flushIntervalMs());
conf.setInt(AbstractFSWAL.MAX_LOGS, params.maxWals());
if (params.useHsync() != null) {
conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, params.useHsync());
}
if (params.useMetaCellComparator() != null) {
conf.setBoolean(HRegion.USE_META_CELL_COMPARATOR, params.useMetaCellComparator());
}
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
IntMath.ceilingPowerOfTwo(params.ringBufferSlotCount()));
LocalRegionWALRoller walRoller = LocalRegionWALRoller.create(td.getTableName() + "-WAL-Roller",
conf, server, walFs, walRootDir, globalWALRootDir, params.archivedWalSuffix(),
params.rollPeriodMs(), params.flushSize());
MasterRegionWALRoller walRoller = MasterRegionWALRoller.create(
td.getTableName() + "-WAL-Roller", conf, server, walFs, walRootDir, globalWALRootDir,
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
walRoller.start();
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString());
@ -311,7 +314,7 @@ public final class LocalRegion {
server.getServerName().toString());
}
Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
LocalRegionFlusherAndCompactor flusherAndCompactor = new LocalRegionFlusherAndCompactor(conf,
MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf,
server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
params.compactMin(), globalArchiveDir, params.archivedHFileSuffix());
walRoller.setFlusherAndCompactor(flusherAndCompactor);
@ -320,6 +323,6 @@ public final class LocalRegion {
LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
" be created again when we actually archive the hfiles later, so continue", archiveDir);
}
return new LocalRegion(region, walFactory, flusherAndCompactor, walRoller);
return new MasterRegion(region, walFactory, flusherAndCompactor, walRoller);
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@ -23,23 +23,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Used for storing data at master side. The data will be stored in a {@link LocalRegion}.
* The factory class for creating a {@link MasterRegion}.
*/
@InterfaceAudience.Private
public final class LocalStore {
public final class MasterRegionFactory {
// Use the character $ to let the log cleaner know that this is not the normal wal file.
public static final String ARCHIVED_WAL_SUFFIX = "$masterlocalwal$";
@ -89,45 +82,8 @@ public final class LocalStore {
private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
private final LocalRegion region;
private LocalStore(LocalRegion region) {
this.region = region;
}
public void update(UpdateLocalRegion action) throws IOException {
region.update(action);
}
public Result get(Get get) throws IOException {
return region.get(get);
}
public RegionScanner getScanner(Scan scan) throws IOException {
return region.getScanner(scan);
}
public void close(boolean abort) {
region.close(abort);
}
@VisibleForTesting
public FlushResult flush(boolean force) throws IOException {
return region.flush(force);
}
@VisibleForTesting
public void requestRollAll() {
region.requestRollAll();
}
@VisibleForTesting
public void waitUntilWalRollFinished() throws InterruptedException {
region.waitUntilWalRollFinished();
}
public static LocalStore create(Server server) throws IOException {
LocalRegionParams params = new LocalRegionParams().server(server)
public static MasterRegion create(Server server) throws IOException {
MasterRegionParams params = new MasterRegionParams().server(server)
.regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
Configuration conf = server.getConfiguration();
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
@ -145,7 +101,6 @@ public final class LocalStore {
long rollPeriodMs = conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS);
params.rollPeriodMs(rollPeriodMs).archivedWalSuffix(ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(ARCHIVED_HFILE_SUFFIX);
LocalRegion region = LocalRegion.create(params);
return new LocalStore(region);
return MasterRegion.create(params);
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import java.io.Closeable;
import java.io.IOException;
@ -45,8 +45,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* As long as there is no RegionServerServices for a 'local' region, we need implement the flush and
* compaction logic by our own.
* As long as there is no RegionServerServices for a master local region, we need implement the
* flush and compaction logic by our own.
* <p/>
* The flush logic is very simple, every time after calling a modification method in
* {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
@ -57,9 +57,9 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* count, if it is above the compactMin, we will do a major compaction.
*/
@InterfaceAudience.Private
class LocalRegionFlusherAndCompactor implements Closeable {
class MasterRegionFlusherAndCompactor implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionFlusherAndCompactor.class);
private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class);
private final Configuration conf;
@ -101,7 +101,7 @@ class LocalRegionFlusherAndCompactor implements Closeable {
private volatile boolean closed = false;
LocalRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
Path globalArchivePath, String archivedHFileSuffix) {
this.conf = conf;
@ -142,7 +142,7 @@ class LocalRegionFlusherAndCompactor implements Closeable {
Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
try {
LocalRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
archivedHFileSuffix);
} catch (IOException e) {
LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
@ -156,7 +156,7 @@ class LocalRegionFlusherAndCompactor implements Closeable {
region.compact(true);
moveHFileToGlobalArchiveDir();
} catch (IOException e) {
LOG.error("Failed to compact procedure store region", e);
LOG.error("Failed to compact master local region", e);
}
compactLock.lock();
try {
@ -207,8 +207,8 @@ class LocalRegionFlusherAndCompactor implements Closeable {
region.flush(true);
lastFlushTime = EnvironmentEdgeManager.currentTime();
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
abortable.abort("Failed to flush procedure store region", e);
LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
abortable.abort("Failed to flush master local region", e);
return;
}
compactLock.lock();

View File

@ -15,17 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The parameters for constructing {@link LocalRegion}.
* The parameters for constructing {@link MasterRegion}.
*/
@InterfaceAudience.Private
public class LocalRegionParams {
public class MasterRegionParams {
private Server server;
@ -53,71 +53,78 @@ public class LocalRegionParams {
private String archivedHFileSuffix;
public LocalRegionParams server(Server server) {
private Boolean useMetaCellComparator;
public MasterRegionParams server(Server server) {
this.server = server;
return this;
}
public LocalRegionParams regionDirName(String regionDirName) {
public MasterRegionParams regionDirName(String regionDirName) {
this.regionDirName = regionDirName;
return this;
}
public LocalRegionParams tableDescriptor(TableDescriptor tableDescriptor) {
public MasterRegionParams tableDescriptor(TableDescriptor tableDescriptor) {
this.tableDescriptor = tableDescriptor;
return this;
}
public LocalRegionParams flushSize(long flushSize) {
public MasterRegionParams flushSize(long flushSize) {
this.flushSize = flushSize;
return this;
}
public LocalRegionParams flushPerChanges(long flushPerChanges) {
public MasterRegionParams flushPerChanges(long flushPerChanges) {
this.flushPerChanges = flushPerChanges;
return this;
}
public LocalRegionParams flushIntervalMs(long flushIntervalMs) {
public MasterRegionParams flushIntervalMs(long flushIntervalMs) {
this.flushIntervalMs = flushIntervalMs;
return this;
}
public LocalRegionParams compactMin(int compactMin) {
public MasterRegionParams compactMin(int compactMin) {
this.compactMin = compactMin;
return this;
}
public LocalRegionParams maxWals(int maxWals) {
public MasterRegionParams maxWals(int maxWals) {
this.maxWals = maxWals;
return this;
}
public LocalRegionParams useHsync(boolean useHsync) {
public MasterRegionParams useHsync(boolean useHsync) {
this.useHsync = useHsync;
return this;
}
public LocalRegionParams ringBufferSlotCount(int ringBufferSlotCount) {
public MasterRegionParams ringBufferSlotCount(int ringBufferSlotCount) {
this.ringBufferSlotCount = ringBufferSlotCount;
return this;
}
public LocalRegionParams rollPeriodMs(long rollPeriodMs) {
public MasterRegionParams rollPeriodMs(long rollPeriodMs) {
this.rollPeriodMs = rollPeriodMs;
return this;
}
public LocalRegionParams archivedWalSuffix(String archivedWalSuffix) {
public MasterRegionParams archivedWalSuffix(String archivedWalSuffix) {
this.archivedWalSuffix = archivedWalSuffix;
return this;
}
public LocalRegionParams archivedHFileSuffix(String archivedHFileSuffix) {
public MasterRegionParams archivedHFileSuffix(String archivedHFileSuffix) {
this.archivedHFileSuffix = archivedHFileSuffix;
return this;
}
public MasterRegionParams useMetaCellComparator(boolean useMetaCellComparator) {
this.useMetaCellComparator = useMetaCellComparator;
return this;
}
public Server server() {
return server;
}
@ -169,4 +176,8 @@ public class LocalRegionParams {
public String archivedHFileSuffix() {
return archivedHFileSuffix;
}
public Boolean useMetaCellComparator() {
return useMetaCellComparator;
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
@ -26,11 +26,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
final class LocalRegionUtils {
final class MasterRegionUtils {
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(MasterRegionUtils.class);
private LocalRegionUtils() {
private MasterRegionUtils() {
}
static void moveFilesUnderDir(FileSystem fs, Path src, Path dst, String suffix)

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
@ -35,19 +35,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* As long as there is no RegionServerServices for a local region, we need implement log roller
* logic by our own.
* As long as there is no RegionServerServices for a master local region, we need implement log
* roller logic by our own.
* <p/>
* We can reuse most of the code for normal wal roller, the only difference is that there is only
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
* procedure store region.
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master
* local region.
*/
@InterfaceAudience.Private
public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
private static final Logger LOG = LoggerFactory.getLogger(LocalRegionWALRoller.class);
private static final Logger LOG = LoggerFactory.getLogger(MasterRegionWALRoller.class);
private volatile LocalRegionFlusherAndCompactor flusherAndCompactor;
private volatile MasterRegionFlusherAndCompactor flusherAndCompactor;
private final FileSystem fs;
@ -57,7 +57,7 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
private final String archivedWALSuffix;
private LocalRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
private MasterRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
Path walRootDir, Path globalWALRootDir, String archivedWALSuffix) {
super(name, conf, abortable);
this.fs = fs;
@ -70,7 +70,8 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
protected void afterRoll(WAL wal) {
// move the archived WAL files to the global archive path
try {
LocalRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir, archivedWALSuffix);
MasterRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir,
archivedWALSuffix);
} catch (IOException e) {
LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
globalWALArchiveDir, e);
@ -79,17 +80,17 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
@Override
protected void scheduleFlush(String encodedRegionName) {
LocalRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
if (flusher != null) {
flusher.requestFlush();
}
}
void setFlusherAndCompactor(LocalRegionFlusherAndCompactor flusherAndCompactor) {
void setFlusherAndCompactor(MasterRegionFlusherAndCompactor flusherAndCompactor) {
this.flusherAndCompactor = flusherAndCompactor;
}
static LocalRegionWALRoller create(String name, Configuration conf, Abortable abortable,
static MasterRegionWALRoller create(String name, Configuration conf, Abortable abortable,
FileSystem fs, Path walRootDir, Path globalWALRootDir, String archivedWALSuffix,
long rollPeriodMs, long flushSize) {
// we can not run with wal disabled, so force set it to true.
@ -100,7 +101,7 @@ public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
// make the roll size the same with the flush size, as we only have one region here
conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
return new LocalRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
return new MasterRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
archivedWALSuffix);
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import java.io.IOException;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@FunctionalInterface
public interface UpdateLocalRegion {
public interface UpdateMasterRegion {
void update(HRegion region) throws IOException;
}

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
@ -84,8 +84,8 @@ public class HFileProcedurePrettyPrinter extends AbstractHBaseTool {
}
private void addAllHFiles() throws IOException {
Path masterProcDir = new Path(CommonFSUtils.getRootDir(conf), LocalStore.MASTER_STORE_DIR);
Path tableDir = CommonFSUtils.getTableDir(masterProcDir, LocalStore.TABLE_NAME);
Path masterProcDir = new Path(CommonFSUtils.getRootDir(conf), MasterRegionFactory.MASTER_STORE_DIR);
Path tableDir = CommonFSUtils.getTableDir(masterProcDir, MasterRegionFactory.TABLE_NAME);
FileSystem fs = tableDir.getFileSystem(conf);
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.procedure2.store.region;
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
@ -88,13 +88,13 @@ public class RegionProcedureStore extends ProcedureStoreBase {
private final LeaseRecovery leaseRecovery;
@VisibleForTesting
final LocalStore localStore;
final MasterRegion region;
private int numThreads;
public RegionProcedureStore(Server server, LocalStore localStore, LeaseRecovery leaseRecovery) {
public RegionProcedureStore(Server server, MasterRegion region, LeaseRecovery leaseRecovery) {
this.server = server;
this.localStore = localStore;
this.region = region;
this.leaseRecovery = leaseRecovery;
}
@ -236,7 +236,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
localStore.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
region.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
.addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
@ -263,7 +263,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
long maxProcId = 0;
try (RegionScanner scanner =
localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
List<Cell> cells = new ArrayList<>();
boolean moreRows;
do {
@ -333,7 +333,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (Procedure<?> subProc : subProcs) {
serializePut(subProc, mutations, rowsToLock);
}
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
Arrays.toString(subProcs), e);
@ -351,7 +351,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (Procedure<?> proc : procs) {
serializePut(proc, mutations, rowsToLock);
}
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
throw new UncheckedIOException(e);
@ -364,7 +364,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
runWithoutRpcCall(() -> {
try {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
localStore.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
region.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
PROC_QUALIFIER, proto.toByteArray())));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
@ -376,7 +376,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
@Override
public void delete(long procId) {
try {
localStore.update(r -> r.put(
region.update(r -> r.put(
new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
@ -393,7 +393,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
for (long subProcId : subProcIds) {
serializeDelete(subProcId, mutations, rowsToLock);
}
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
Arrays.toString(subProcIds), e);
@ -417,7 +417,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
serializeDelete(procId, mutations, rowsToLock);
}
try {
localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
throw new UncheckedIOException(e);
@ -429,7 +429,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
// actually delete the procedures if it is not the one with the max procedure id.
List<Cell> cells = new ArrayList<Cell>();
try (RegionScanner scanner =
localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
// skip the row with max procedure id
boolean moreRows = scanner.next(cells);
if (cells.isEmpty()) {
@ -444,7 +444,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
Cell cell = cells.get(0);
cells.clear();
if (cell.getValueLength() == 0) {
localStore.update(r -> r
region.update(r -> r
.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
}
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY;
import java.io.PrintStream;
import java.time.Instant;

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
@ -253,6 +254,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String SPECIAL_RECOVERED_EDITS_DIR =
"hbase.hregion.special.recovered.edits.dir";
/**
* Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
* master local region.
*/
public static final String USE_META_CELL_COMPARATOR = "hbase.region.use.meta.cell.comparator";
public static final boolean DEFAULT_USE_META_CELL_COMPARATOR = false;
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
@ -412,6 +421,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Used for testing.
private volatile Long timeoutForWriteLock = null;
private final CellComparator cellComparator;
/**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
@ -765,9 +776,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
this.baseConf = confParam;
this.conf = new CompoundConfiguration()
.add(confParam)
.addBytesMap(htd.getValues());
this.conf = new CompoundConfiguration().add(confParam).addBytesMap(htd.getValues());
this.cellComparator = htd.isMetaTable() ||
conf.getBoolean(USE_META_CELL_COMPARATOR, DEFAULT_USE_META_CELL_COMPARATOR) ?
CellComparatorImpl.META_COMPARATOR :
CellComparatorImpl.COMPARATOR;
this.lock = new ReentrantReadWriteLock(conf.getBoolean(FAIR_REENTRANT_CLOSE_LOCK,
DEFAULT_FAIR_REENTRANT_CLOSE_LOCK));
this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
@ -8849,8 +8862,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public CellComparator getCellComparator() {
return this.getRegionInfo().isMetaRegion() ? CellComparatorImpl.META_COMPARATOR
: CellComparatorImpl.COMPARATOR;
return cellComparator;
}
public long getMemStoreFlushSize() {

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.Before;
public class LocalRegionTestBase {
public class MasterRegionTestBase {
protected HBaseCommonTestingUtility htu;
protected LocalRegion region;
protected MasterRegion region;
protected ChoreService choreService;
@ -65,7 +65,7 @@ public class LocalRegionTestBase {
protected void configure(Configuration conf) throws IOException {
}
protected void configure(LocalRegionParams params) {
protected void configure(MasterRegionParams params) {
}
protected void postSetUp() throws IOException {
@ -87,15 +87,15 @@ public class LocalRegionTestBase {
when(server.getChoreService()).thenReturn(choreService);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
LocalRegionParams params = new LocalRegionParams();
MasterRegionParams params = new MasterRegionParams();
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
.archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
.archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
configure(params);
region = LocalRegion.create(params);
region = MasterRegion.create(params);
postSetUp();
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -41,18 +41,18 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestLocalRegionCompaction extends LocalRegionTestBase {
public class TestMasterRegionCompaction extends MasterRegionTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLocalRegionCompaction.class);
HBaseClassTestRule.forClass(TestMasterRegionCompaction.class);
private int compactMin = 4;
private HFileCleaner hfileCleaner;
@Override
protected void configure(LocalRegionParams params) {
protected void configure(MasterRegionParams params) {
params.compactMin(compactMin);
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -47,17 +47,17 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestLocalRegionFlush {
public class TestMasterRegionFlush {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLocalRegionFlush.class);
HBaseClassTestRule.forClass(TestMasterRegionFlush.class);
private Configuration conf;
private HRegion region;
private LocalRegionFlusherAndCompactor flusher;
private MasterRegionFlusherAndCompactor flusher;
private AtomicInteger flushCalled;
@ -97,7 +97,7 @@ public class TestLocalRegionFlush {
}
private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
flusher = new LocalRegionFlusherAndCompactor(conf, new Abortable() {
flusher = new MasterRegionFlusherAndCompactor(conf, new Abortable() {
@Override
public boolean isAborted() {

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -69,12 +69,12 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@Category({ MasterTests.class, MediumTests.class })
public class TestLocalRegionOnTwoFileSystems {
private static final Logger LOG = LoggerFactory.getLogger(TestLocalRegionOnTwoFileSystems.class);
public class TestMasterRegionOnTwoFileSystems {
private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegionOnTwoFileSystems.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLocalRegionOnTwoFileSystems.class);
HBaseClassTestRule.forClass(TestMasterRegionOnTwoFileSystems.class);
private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility();
@ -90,7 +90,7 @@ public class TestLocalRegionOnTwoFileSystems {
private static int COMPACT_MIN = 4;
private LocalRegion region;
private MasterRegion region;
@BeforeClass
public static void setUp() throws Exception {
@ -113,18 +113,18 @@ public class TestLocalRegionOnTwoFileSystems {
HFILE_UTIL.cleanupTestDir();
}
private LocalRegion createLocalRegion(ServerName serverName) throws IOException {
private MasterRegion createMasterRegion(ServerName serverName) throws IOException {
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
when(server.getServerName()).thenReturn(serverName);
LocalRegionParams params = new LocalRegionParams();
MasterRegionParams params = new MasterRegionParams();
params.server(server).regionDirName("local").tableDescriptor(TD)
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
.useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
.archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
return LocalRegion.create(params);
.archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
return MasterRegion.create(params);
}
@Before
@ -135,7 +135,7 @@ public class TestLocalRegionOnTwoFileSystems {
Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
FileSystem walFs = WAL_UTIL.getTestFileSystem();
walFs.delete(walRootDir, true);
region = createLocalRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
region = createMasterRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
}
@After
@ -174,9 +174,8 @@ public class TestLocalRegionOnTwoFileSystems {
return false;
}
});
LOG.info("hfile archive content {}",
Arrays.stream(rootFs.listStatus(storeArchiveDir)).map(f -> f.getPath().toString()).
collect(Collectors.joining(",")));
LOG.info("hfile archive content {}", Arrays.stream(rootFs.listStatus(storeArchiveDir))
.map(f -> f.getPath().toString()).collect(Collectors.joining(",")));
// make sure the archived wal files are on the wal fs
Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
@ -219,7 +218,7 @@ public class TestLocalRegionOnTwoFileSystems {
region.update(r -> r.put(put));
}
region.close(true);
region = createLocalRegion(
region = createMasterRegion(
ServerName.valueOf("localhost", 12345, System.currentTimeMillis() + round + 1));
try (RegionScanner scanner = region.getScanner(new Scan())) {
List<Cell> cells = new ArrayList<>();

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.store;
package org.apache.hadoop.hbase.master.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -40,11 +40,11 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestLocalRegionWALCleaner extends LocalRegionTestBase {
public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLocalRegionWALCleaner.class);
HBaseClassTestRule.forClass(TestMasterRegionWALCleaner.class);
private static long TTL_MS = 5000;

View File

@ -29,7 +29,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
@ -113,7 +114,7 @@ public class RegionProcedureStorePerformanceEvaluation
}
}
private LocalStore localStore;
private MasterRegion region;
@Override
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
@ -127,11 +128,11 @@ public class RegionProcedureStorePerformanceEvaluation
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, null);
conf.setBoolean(LocalStore.USE_HSYNC_KEY, "hsync".equals(syncType));
conf.setBoolean(MasterRegionFactory.USE_HSYNC_KEY, "hsync".equals(syncType));
CommonFSUtils.setRootDir(conf, storeDir);
MockServer server = new MockServer(conf);
localStore = LocalStore.create(server);
return new RegionProcedureStore(server, localStore, (fs, apth) -> {
region = MasterRegionFactory.create(server);
return new RegionProcedureStore(server, region, (fs, apth) -> {
});
}
@ -148,7 +149,7 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
protected void postStop(RegionProcedureStore store) throws IOException {
localStore.close(true);
region.close(true);
}
public static void main(String[] args) throws IOException {

View File

@ -22,7 +22,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -37,7 +38,7 @@ public class RegionProcedureStoreTestBase {
protected HBaseCommonTestingUtility htu;
protected LocalStore localStore;
protected MasterRegion region;
protected RegionProcedureStore store;
@ -51,14 +52,14 @@ public class RegionProcedureStoreTestBase {
Path testDir = htu.getDataTestDir();
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
Server server = RegionProcedureStoreTestHelper.mockServer(conf);
localStore = LocalStore.create(server);
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
region = MasterRegionFactory.create(server);
store = RegionProcedureStoreTestHelper.createStore(server, region, new LoadCounter());
}
@After
public void tearDown() throws IOException {
store.stop(true);
localStore.close(true);
region.close(true);
htu.cleanupTestDir();
}
}

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
@ -43,9 +43,9 @@ final class RegionProcedureStoreTestHelper {
return server;
}
static RegionProcedureStore createStore(Server server, LocalStore localStore,
static RegionProcedureStore createStore(Server server, MasterRegion region,
ProcedureLoader loader) throws IOException {
RegionProcedureStore store = new RegionProcedureStore(server, localStore, new LeaseRecovery() {
RegionProcedureStore store = new RegionProcedureStore(server, region, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -94,15 +94,15 @@ public class TestHFileProcedurePrettyPrinter extends RegionProcedureStoreTestBas
store.insert(proc, null);
procs.add(proc);
}
store.localStore.flush(true);
store.region.flush(true);
for (int i = 0; i < 5; i++) {
store.delete(procs.get(i).getProcId());
}
store.localStore.flush(true);
store.region.flush(true);
store.cleanup();
store.localStore.flush(true);
store.region.flush(true);
Path tableDir = CommonFSUtils.getTableDir(
new Path(htu.getDataTestDir(), LocalStore.MASTER_STORE_DIR), LocalStore.TABLE_NAME);
new Path(htu.getDataTestDir(), MasterRegionFactory.MASTER_STORE_DIR), MasterRegionFactory.TABLE_NAME);
FileSystem fs = tableDir.getFileSystem(htu.getConfiguration());
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]

View File

@ -126,24 +126,24 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
assertEquals(1, loader.getRunnableCount());
// the row should still be there
assertTrue(store.localStore
assertTrue(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
assertTrue(store.localStore
assertTrue(store.region
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
// proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
// id
store.cleanup();
assertTrue(store.localStore
assertTrue(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
assertFalse(store.localStore
assertFalse(store.region
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
store.insert(proc4, null);
store.cleanup();
// proc3 should also be deleted as now proc4 holds the max proc id
assertFalse(store.localStore
assertFalse(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
}

View File

@ -39,7 +39,8 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@ -67,7 +68,7 @@ public class TestRegionProcedureStoreMigration {
private Server server;
private LocalStore localStore;
private MasterRegion region;
private RegionProcedureStore store;
@ -92,7 +93,7 @@ public class TestRegionProcedureStoreMigration {
walStore.recoverLease();
walStore.load(new LoadCounter());
server = RegionProcedureStoreTestHelper.mockServer(conf);
localStore = LocalStore.create(server);
region = MasterRegionFactory.create(server);
}
@After
@ -100,7 +101,7 @@ public class TestRegionProcedureStoreMigration {
if (store != null) {
store.stop(true);
}
localStore.close(true);
region.close(true);
walStore.stop(true);
htu.cleanupTestDir();
}
@ -120,7 +121,7 @@ public class TestRegionProcedureStoreMigration {
SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
MutableLong maxProcIdSet = new MutableLong(0);
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new ProcedureLoader() {
store = RegionProcedureStoreTestHelper.createStore(server, region, new ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
@ -166,7 +167,7 @@ public class TestRegionProcedureStoreMigration {
walStore.stop(true);
try {
store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
store = RegionProcedureStoreTestHelper.createStore(server, region, new LoadCounter());
fail("Should fail since AssignProcedure is not supported");
} catch (HBaseIOException e) {
assertThat(e.getMessage(), startsWith("Unsupported"));

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.store.LocalStore;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.util.ToolRunner;
@ -58,18 +58,18 @@ public class TestWALProcedurePrettyPrinter extends RegionProcedureStoreTestBase
store.insert(proc, null);
procs.add(proc);
}
store.localStore.flush(true);
store.region.flush(true);
for (int i = 0; i < 5; i++) {
store.delete(procs.get(i).getProcId());
}
store.cleanup();
Path walParentDir = new Path(htu.getDataTestDir(),
LocalStore.MASTER_STORE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
MasterRegionFactory.MASTER_STORE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = walParentDir.getFileSystem(htu.getConfiguration());
Path walDir = fs.listStatus(walParentDir)[0].getPath();
Path walFile = fs.listStatus(walDir)[0].getPath();
store.localStore.requestRollAll();
store.localStore.waitUntilWalRollFinished();
store.region.requestRollAll();
store.region.waitUntilWalRollFinished();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bos);
WALProcedurePrettyPrinter printer = new WALProcedurePrettyPrinter(out);