HBASE-14448 Refine RegionGroupingProvider Phase-2: remove provider nesting and formalize wal group name (Yu Li)
This commit is contained in:
parent
a7afc132e2
commit
bf85960131
|
@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
|
import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A WAL grouping strategy that limits the number of delegate providers (i.e. wal group) to
|
* A WAL grouping strategy that limits the number of wal groups to
|
||||||
* "hbase.wal.regiongrouping.numgroups".
|
* "hbase.wal.regiongrouping.numgroups".
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class DefaultWALProvider implements WALProvider {
|
||||||
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
|
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected FSHLog log = null;
|
protected volatile FSHLog log = null;
|
||||||
private WALFactory factory = null;
|
private WALFactory factory = null;
|
||||||
private Configuration conf = null;
|
private Configuration conf = null;
|
||||||
private List<WALActionsListener> listeners = null;
|
private List<WALActionsListener> listeners = null;
|
||||||
|
@ -119,13 +119,16 @@ public class DefaultWALProvider implements WALProvider {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WAL getWAL(final byte[] identifier) throws IOException {
|
public WAL getWAL(final byte[] identifier) throws IOException {
|
||||||
// must lock since getWAL will create hlog on fs which is time consuming
|
if (log == null) {
|
||||||
synchronized (walCreateLock) {
|
// only lock when need to create wal, and need to lock since
|
||||||
if (log == null) {
|
// creating hlog on fs is time consuming
|
||||||
log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
|
synchronized (walCreateLock) {
|
||||||
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
if (log == null) {
|
||||||
listeners, true, logPrefix,
|
log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
|
||||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
||||||
|
listeners, true, logPrefix,
|
||||||
|
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return log;
|
return log;
|
||||||
|
|
|
@ -18,23 +18,30 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.wal;
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID;
|
||||||
|
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
||||||
|
|
||||||
// imports for classes still in regionserver.wal
|
// imports for classes still in regionserver.wal
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A WAL Provider that returns a WAL per group of regions.
|
* A WAL Provider that returns a WAL per group of regions.
|
||||||
|
@ -43,14 +50,11 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
* property "hbase.wal.regiongrouping.strategy". Current strategy choices are
|
* property "hbase.wal.regiongrouping.strategy". Current strategy choices are
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
|
* <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
|
||||||
* "identity".</li>
|
* "bounded".</li>
|
||||||
* <li><em>identity</em> : each region belongs to its own group.</li>
|
* <li><em>identity</em> : each region belongs to its own group.</li>
|
||||||
|
* <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
* Optionally, a FQCN to a custom implementation may be given.
|
* Optionally, a FQCN to a custom implementation may be given.
|
||||||
*
|
|
||||||
* WAL creation is delegated to another WALProvider, configured via the property
|
|
||||||
* "hbase.wal.regiongrouping.delegate". The property takes the same options as "hbase.wal.provider"
|
|
||||||
* (ref {@link WALFactory}) and defaults to the defaultProvider.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class RegionGroupingProvider implements WALProvider {
|
class RegionGroupingProvider implements WALProvider {
|
||||||
|
@ -120,21 +124,22 @@ class RegionGroupingProvider implements WALProvider {
|
||||||
public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
|
public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
|
||||||
public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
|
public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
|
||||||
|
|
||||||
static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate";
|
private static final String META_WAL_GROUP_NAME = "meta";
|
||||||
static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name();
|
|
||||||
|
|
||||||
/** A group-provider mapping, recommended to make sure one-one rather than many-one mapping */
|
/** A group-wal mapping, recommended to make sure one-one rather than many-one mapping */
|
||||||
protected final ConcurrentMap<String, WALProvider> cached =
|
protected final Map<String, FSHLog> cached = new HashMap<String, FSHLog>();
|
||||||
new ConcurrentHashMap<String, WALProvider>();
|
/** Stores unique wals generated by this RegionGroupingProvider */
|
||||||
/** Stores delegation providers (no duplicated) used by this RegionGroupingProvider */
|
private final Set<FSHLog> logs = Collections.synchronizedSet(new HashSet<FSHLog>());
|
||||||
private final Set<WALProvider> providers = Collections
|
|
||||||
.synchronizedSet(new HashSet<WALProvider>());
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* we synchronize on walCacheLock to prevent wal recreation in different threads
|
||||||
|
*/
|
||||||
|
final Object walCacheLock = new Object();
|
||||||
|
|
||||||
protected RegionGroupingStrategy strategy = null;
|
protected RegionGroupingStrategy strategy = null;
|
||||||
private WALFactory factory = null;
|
|
||||||
private List<WALActionsListener> listeners = null;
|
private List<WALActionsListener> listeners = null;
|
||||||
private String providerId = null;
|
private String providerId = null;
|
||||||
|
private Configuration conf = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(final WALFactory factory, final Configuration conf,
|
public void init(final WALFactory factory, final Configuration conf,
|
||||||
|
@ -142,49 +147,80 @@ class RegionGroupingProvider implements WALProvider {
|
||||||
if (null != strategy) {
|
if (null != strategy) {
|
||||||
throw new IllegalStateException("WALProvider.init should only be called once.");
|
throw new IllegalStateException("WALProvider.init should only be called once.");
|
||||||
}
|
}
|
||||||
this.factory = factory;
|
|
||||||
this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners);
|
this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners);
|
||||||
this.providerId = providerId;
|
StringBuilder sb = new StringBuilder().append(factory.factoryId);
|
||||||
|
if (providerId != null) {
|
||||||
|
if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
|
||||||
|
sb.append(providerId);
|
||||||
|
} else {
|
||||||
|
sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.providerId = sb.toString();
|
||||||
this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
|
this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
|
||||||
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Populate the cache for this group.
|
* Populate the cache for this group.
|
||||||
*/
|
*/
|
||||||
WALProvider populateCache(final String group) throws IOException {
|
FSHLog populateCache(String groupName) throws IOException {
|
||||||
final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER,
|
boolean isMeta = META_WAL_PROVIDER_ID.equals(providerId);
|
||||||
listeners, providerId + "-" + UUID.randomUUID());
|
String hlogPrefix;
|
||||||
final WALProvider extant = cached.putIfAbsent(group, temp);
|
List<WALActionsListener> listeners;
|
||||||
if (null != extant) {
|
if (isMeta) {
|
||||||
// someone else beat us to initializing, just take what they set.
|
hlogPrefix = this.providerId;
|
||||||
temp.close();
|
// don't watch log roll for meta
|
||||||
return extant;
|
listeners = Collections.<WALActionsListener> singletonList(new MetricsWAL());
|
||||||
|
} else {
|
||||||
|
hlogPrefix = groupName;
|
||||||
|
listeners = this.listeners;
|
||||||
}
|
}
|
||||||
providers.add(temp);
|
FSHLog log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
|
||||||
return temp;
|
DefaultWALProvider.getWALDirectoryName(providerId), HConstants.HREGION_OLDLOGDIR_NAME,
|
||||||
|
conf, listeners, true, hlogPrefix, isMeta ? META_WAL_PROVIDER_ID : null);
|
||||||
|
cached.put(groupName, log);
|
||||||
|
logs.add(log);
|
||||||
|
return log;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WAL getWAL(final String group) throws IOException {
|
||||||
|
WAL log = cached.get(walCacheLock);
|
||||||
|
if (null == log) {
|
||||||
|
// only lock when need to create wal, and need to lock since
|
||||||
|
// creating hlog on fs is time consuming
|
||||||
|
synchronized (this.walCacheLock) {
|
||||||
|
log = cached.get(group);// check again
|
||||||
|
if (null == log) {
|
||||||
|
log = populateCache(group);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return log;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WAL getWAL(final byte[] identifier) throws IOException {
|
public WAL getWAL(final byte[] identifier) throws IOException {
|
||||||
final String group = strategy.group(identifier);
|
final String group;
|
||||||
WALProvider provider = cached.get(group);
|
if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
|
||||||
if (null == provider) {
|
group = META_WAL_GROUP_NAME;
|
||||||
provider = populateCache(group);
|
} else {
|
||||||
|
group = strategy.group(identifier);
|
||||||
}
|
}
|
||||||
return provider.getWAL(identifier);
|
return getWAL(group);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() throws IOException {
|
public void shutdown() throws IOException {
|
||||||
// save the last exception and rethrow
|
// save the last exception and rethrow
|
||||||
IOException failure = null;
|
IOException failure = null;
|
||||||
synchronized (providers) {
|
synchronized (logs) {
|
||||||
for (WALProvider provider : providers) {
|
for (FSHLog wal : logs) {
|
||||||
try {
|
try {
|
||||||
provider.shutdown();
|
wal.shutdown();
|
||||||
} catch (IOException exception) {
|
} catch (IOException exception) {
|
||||||
LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
|
LOG.error("Problem shutting down log '" + wal + "': " + exception.getMessage());
|
||||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
LOG.debug("Details of problem shutting down log '" + wal + "'", exception);
|
||||||
failure = exception;
|
failure = exception;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -198,13 +234,13 @@ class RegionGroupingProvider implements WALProvider {
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
// save the last exception and rethrow
|
// save the last exception and rethrow
|
||||||
IOException failure = null;
|
IOException failure = null;
|
||||||
synchronized (providers) {
|
synchronized (logs) {
|
||||||
for (WALProvider provider : providers) {
|
for (FSHLog wal : logs) {
|
||||||
try {
|
try {
|
||||||
provider.close();
|
wal.close();
|
||||||
} catch (IOException exception) {
|
} catch (IOException exception) {
|
||||||
LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
|
LOG.error("Problem closing log '" + wal + "': " + exception.getMessage());
|
||||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
LOG.debug("Details of problem closing wal '" + wal + "'", exception);
|
||||||
failure = exception;
|
failure = exception;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -226,9 +262,9 @@ class RegionGroupingProvider implements WALProvider {
|
||||||
@Override
|
@Override
|
||||||
public long getNumLogFiles() {
|
public long getNumLogFiles() {
|
||||||
long numLogFiles = 0;
|
long numLogFiles = 0;
|
||||||
synchronized (providers) {
|
synchronized (logs) {
|
||||||
for (WALProvider provider : providers) {
|
for (FSHLog wal : logs) {
|
||||||
numLogFiles += provider.getNumLogFiles();
|
numLogFiles += wal.getNumLogFiles();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return numLogFiles;
|
return numLogFiles;
|
||||||
|
@ -237,9 +273,9 @@ class RegionGroupingProvider implements WALProvider {
|
||||||
@Override
|
@Override
|
||||||
public long getLogFileSize() {
|
public long getLogFileSize() {
|
||||||
long logFileSize = 0;
|
long logFileSize = 0;
|
||||||
synchronized (providers) {
|
synchronized (logs) {
|
||||||
for (WALProvider provider : providers) {
|
for (FSHLog wal : logs) {
|
||||||
logFileSize += provider.getLogFileSize();
|
logFileSize += wal.getLogFileSize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return logFileSize;
|
return logFileSize;
|
||||||
|
|
Loading…
Reference in New Issue