HBASE-15537 Make multi WAL work with WALs other than FSHLog

This commit is contained in:
zhangduo 2016-04-06 17:04:28 +08:00
parent 2dcd08bc3d
commit 394b89d153
11 changed files with 273 additions and 150 deletions

View File

@ -678,6 +678,10 @@ public abstract class AbstractFSWAL<W> implements WAL {
// NewPath could be equal to oldPath if replaceWriter fails.
newPath = replaceWriter(oldPath, newPath, nextWriter);
tellListenersAboutPostLogRoll(oldPath, newPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Create new " + getClass().getSimpleName() + " writer with pipeline: "
+ Arrays.toString(getPipeline()));
}
// Can we delete any of the old log files?
if (getNumRolledLogFiles() > 0) {
cleanOldLogs();

View File

@ -82,7 +82,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* @param factory factory that made us, identity used for FS layout. may not be null
* @param conf may not be null
* @param listeners may be null
* @param providerId differentiate between providers from one facotry, used for FS layout. may be
* @param providerId differentiate between providers from one factory, used for FS layout. may be
* null
*/
@Override
@ -109,7 +109,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
@Override
public WAL getWAL(byte[] identifier, byte[] namespace) throws IOException {
public T getWAL(byte[] identifier, byte[] namespace) throws IOException {
T walCopy = wal;
if (walCopy == null) {
// only lock when need to create wal, and need to lock since

View File

@ -18,34 +18,31 @@
*/
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID;
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
// imports for classes still in regionserver.wal
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
/**
* A WAL Provider that returns a WAL per group of regions.
*
* This provider follows the decorator pattern and mainly holds the logic for WAL grouping.
* WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER}
*
* Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the
* property "hbase.wal.regiongrouping.strategy". Current strategy choices are
* <ul>
@ -57,7 +54,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
* Optionally, a FQCN to a custom implementation may be given.
*/
@InterfaceAudience.Private
class RegionGroupingProvider implements WALProvider {
public class RegionGroupingProvider implements WALProvider {
private static final Log LOG = LogFactory.getLog(RegionGroupingProvider.class);
/**
@ -124,22 +121,23 @@ class RegionGroupingProvider implements WALProvider {
public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
/** delegate provider for WAL creation/roll/close */
public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider
.name();
private static final String META_WAL_GROUP_NAME = "meta";
/** A group-wal mapping, recommended to make sure one-one rather than many-one mapping */
protected final Map<String, FSHLog> cached = new HashMap<String, FSHLog>();
/** Stores unique wals generated by this RegionGroupingProvider */
private final Set<FSHLog> logs = Collections.synchronizedSet(new HashSet<FSHLog>());
/** A group-provider mapping, make sure one-one rather than many-one mapping */
private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
/**
* we synchronize on walCacheLock to prevent wal recreation in different threads
*/
final Object walCacheLock = new Object();
private final IdReadWriteLock createLock = new IdReadWriteLock();
protected RegionGroupingStrategy strategy = null;
private RegionGroupingStrategy strategy = null;
private WALFactory factory = null;
private List<WALActionsListener> listeners = null;
private String providerId = null;
private Configuration conf = null;
private Class<? extends WALProvider> providerClass;
@Override
public void init(final WALFactory factory, final Configuration conf,
@ -147,6 +145,7 @@ class RegionGroupingProvider implements WALProvider {
if (null != strategy) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
this.factory = factory;
this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners);
StringBuilder sb = new StringBuilder().append(factory.factoryId);
if (providerId != null) {
@ -158,45 +157,33 @@ class RegionGroupingProvider implements WALProvider {
}
this.providerId = sb.toString();
this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
this.conf = conf;
this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
}
/**
* Populate the cache for this group.
*/
FSHLog populateCache(String groupName) throws IOException {
boolean isMeta = META_WAL_PROVIDER_ID.equals(providerId);
String hlogPrefix;
List<WALActionsListener> listeners;
if (isMeta) {
hlogPrefix = this.providerId;
// don't watch log roll for meta
listeners = Collections.<WALActionsListener> singletonList(new MetricsWAL());
private WALProvider createProvider(String group) throws IOException {
if (META_WAL_PROVIDER_ID.equals(providerId)) {
return factory.createProvider(providerClass, listeners, META_WAL_PROVIDER_ID);
} else {
hlogPrefix = groupName;
listeners = this.listeners;
return factory.createProvider(providerClass, listeners, group);
}
FSHLog log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
DefaultWALProvider.getWALDirectoryName(providerId), HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, true, hlogPrefix, isMeta ? META_WAL_PROVIDER_ID : null);
cached.put(groupName, log);
logs.add(log);
return log;
}
private WAL getWAL(final String group) throws IOException {
WAL log = cached.get(group);
if (null == log) {
// only lock when need to create wal, and need to lock since
// creating hlog on fs is time consuming
synchronized (this.walCacheLock) {
log = cached.get(group);// check again
if (null == log) {
log = populateCache(group);
WALProvider provider = cached.get(group);
if (provider == null) {
Lock lock = createLock.getLock(group.hashCode()).writeLock();
lock.lock();
try {
provider = cached.get(group);
if (provider == null) {
provider = createProvider(group);
cached.put(group, provider);
}
} finally {
lock.unlock();
}
}
return log;
return provider.getWAL(null, null);
}
@Override
@ -214,15 +201,15 @@ class RegionGroupingProvider implements WALProvider {
public void shutdown() throws IOException {
// save the last exception and rethrow
IOException failure = null;
synchronized (logs) {
for (FSHLog wal : logs) {
try {
wal.shutdown();
} catch (IOException exception) {
LOG.error("Problem shutting down log '" + wal + "': " + exception.getMessage());
LOG.debug("Details of problem shutting down log '" + wal + "'", exception);
failure = exception;
for (WALProvider provider: cached.values()) {
try {
provider.shutdown();
} catch (IOException e) {
LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
}
failure = e;
}
}
if (failure != null) {
@ -234,15 +221,15 @@ class RegionGroupingProvider implements WALProvider {
public void close() throws IOException {
// save the last exception and rethrow
IOException failure = null;
synchronized (logs) {
for (FSHLog wal : logs) {
try {
wal.close();
} catch (IOException exception) {
LOG.error("Problem closing log '" + wal + "': " + exception.getMessage());
LOG.debug("Details of problem closing wal '" + wal + "'", exception);
failure = exception;
for (WALProvider provider : cached.values()) {
try {
provider.close();
} catch (IOException e) {
LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
}
failure = e;
}
}
if (failure != null) {
@ -262,10 +249,8 @@ class RegionGroupingProvider implements WALProvider {
@Override
public long getNumLogFiles() {
long numLogFiles = 0;
synchronized (logs) {
for (FSHLog wal : logs) {
numLogFiles += wal.getNumLogFiles();
}
for (WALProvider provider : cached.values()) {
numLogFiles += provider.getNumLogFiles();
}
return numLogFiles;
}
@ -273,12 +258,9 @@ class RegionGroupingProvider implements WALProvider {
@Override
public long getLogFileSize() {
long logFileSize = 0;
synchronized (logs) {
for (FSHLog wal : logs) {
logFileSize += wal.getLogFileSize();
}
for (WALProvider provider : cached.values()) {
logFileSize += provider.getLogFileSize();
}
return logFileSize;
}
}

View File

@ -127,37 +127,45 @@ public class WALFactory {
factoryId = SINGLETON_ID;
}
/**
* instantiate a provider from a config property.
* requires conf to have already been set (as well as anything the provider might need to read).
*/
WALProvider getProvider(final String key, final String defaultValue,
final List<WALActionsListener> listeners, final String providerId) throws IOException {
Class<? extends WALProvider> clazz;
Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
try {
clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
return Providers.valueOf(conf.get(key, defaultValue)).clazz;
} catch (IllegalArgumentException exception) {
// Fall back to them specifying a class name
// Note that the passed default class shouldn't actually be used, since the above only fails
// when there is a config value present.
clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
return conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
}
}
WALProvider createProvider(Class<? extends WALProvider> clazz,
List<WALActionsListener> listeners, String providerId) throws IOException {
LOG.info("Instantiating WALProvider of type " + clazz);
try {
final WALProvider result = clazz.newInstance();
result.init(this, conf, listeners, providerId);
return result;
} catch (InstantiationException exception) {
LOG.error("couldn't set up WALProvider, check config key " + key);
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
LOG.debug("Exception details for failure to load WALProvider.", exception);
throw new IOException("couldn't set up WALProvider", exception);
} catch (IllegalAccessException exception) {
LOG.error("couldn't set up WALProvider, check config key " + key);
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
LOG.debug("Exception details for failure to load WALProvider.", exception);
throw new IOException("couldn't set up WALProvider", exception);
}
}
/**
* instantiate a provider from a config property.
* requires conf to have already been set (as well as anything the provider might need to read).
*/
WALProvider getProvider(final String key, final String defaultValue,
final List<WALActionsListener> listeners, final String providerId) throws IOException {
Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue);
return createProvider(clazz, listeners, providerId);
}
/**
* @param conf must not be null, will keep a reference to read params in later reader/writer
* instances.

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationEndpointWithMultipleAsyncWAL extends TestReplicationEndpoint {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationEndpoint.setUpBeforeClass();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@ -29,6 +30,7 @@ public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpo
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationEndpoint.setUpBeforeClass();
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL extends
TestReplicationKillMasterRSCompressed {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationKillMasterRSCompressed.setUpBeforeClass();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@ -30,6 +31,7 @@ public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationKillMasterRSCompressed.setUpBeforeClass();
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicationSyncUpTool {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationBase.setUpBeforeClass();
}
}

View File

@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@ -30,6 +31,7 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyn
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationBase.setUpBeforeClass();
}
}

View File

@ -18,80 +18,92 @@
*/
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.*;
import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@Category({RegionServerTests.class, LargeTests.class})
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, LargeTests.class })
public class TestBoundedRegionGroupingStrategy {
protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class);
private static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class);
@Rule
public TestName currentTest = new TestName();
protected static Configuration conf;
protected static FileSystem fs;
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration CONF;
private static DistributedFileSystem FS;
@Parameter
public String walProvider;
@Parameters(name = "{index}: delegate-provider={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
}
@Before
public void setUp() throws Exception {
FileStatus[] entries = fs.listStatus(new Path("/"));
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
}
CONF.set(DELEGATE_PROVIDER, walProvider);
}
@After
public void tearDown() throws Exception {
FileStatus[] entries = FS.listStatus(new Path("/"));
for (FileStatus dir : entries) {
FS.delete(dir.getPath(), true);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
CONF = TEST_UTIL.getConfiguration();
// Make block sizes small.
conf.setInt("dfs.blocksize", 1024 * 1024);
CONF.setInt("dfs.blocksize", 1024 * 1024);
// quicker heartbeat interval for faster DN death notification
conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
conf.setInt("dfs.heartbeat.interval", 1);
conf.setInt("dfs.client.socket-timeout", 5000);
CONF.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
CONF.setInt("dfs.heartbeat.interval", 1);
CONF.setInt("dfs.client.socket-timeout", 5000);
// faster failover with cluster.shutdown();fs.close() idiom
conf.setInt("hbase.ipc.client.connect.max.retries", 1);
conf.setInt("dfs.client.block.recovery.retries", 1);
conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
CONF.setInt("hbase.ipc.client.connect.max.retries", 1);
CONF.setInt("dfs.client.block.recovery.retries", 1);
CONF.setInt("hbase.ipc.client.connection.maxidletime", 500);
conf.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class);
conf.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name());
CONF.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class);
CONF.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name());
TEST_UTIL.startMiniDFSCluster(3);
fs = TEST_UTIL.getDFSCluster().getFileSystem();
FS = TEST_UTIL.getDFSCluster().getFileSystem();
}
@AfterClass
@ -106,8 +118,8 @@ public class TestBoundedRegionGroupingStrategy {
public void testConcurrentWrites() throws Exception {
// Run the WPE tool with three threads writing 3000 edits each concurrently.
// When done, verify that all edits were written.
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
new String[] { "-threads", "3", "-verify", "-noclosefs", "-iterations", "3000" });
assertEquals(0, errCode);
}
@ -117,39 +129,39 @@ public class TestBoundedRegionGroupingStrategy {
@Test
public void testMoreRegionsThanBound() throws Exception {
final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
"-regions", parallelism});
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
"-regions", parallelism });
assertEquals(0, errCode);
}
@Test
public void testBoundsGreaterThanDefault() throws Exception {
final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
try {
conf.setInt(NUM_REGION_GROUPS, temp*4);
final String parallelism = Integer.toString(temp*4);
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
"-regions", parallelism});
CONF.setInt(NUM_REGION_GROUPS, temp * 4);
final String parallelism = Integer.toString(temp * 4);
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
"-regions", parallelism });
assertEquals(0, errCode);
} finally {
conf.setInt(NUM_REGION_GROUPS, temp);
CONF.setInt(NUM_REGION_GROUPS, temp);
}
}
@Test
public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception {
final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
try {
conf.setInt(NUM_REGION_GROUPS, temp*4);
final String parallelism = Integer.toString(temp*4*2);
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
"-regions", parallelism});
CONF.setInt(NUM_REGION_GROUPS, temp * 4);
final String parallelism = Integer.toString(temp * 4 * 2);
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
new String[] { "-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
"-regions", parallelism });
assertEquals(0, errCode);
} finally {
conf.setInt(NUM_REGION_GROUPS, temp);
CONF.setInt(NUM_REGION_GROUPS, temp);
}
}
@ -158,32 +170,33 @@ public class TestBoundedRegionGroupingStrategy {
*/
@Test
public void setMembershipDedups() throws IOException {
final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
WALFactory wals = null;
try {
conf.setInt(NUM_REGION_GROUPS, temp*4);
CONF.setInt(NUM_REGION_GROUPS, temp * 4);
// Set HDFS root directory for storing WAL
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
FSUtils.setRootDir(CONF, TEST_UTIL.getDataTestDirOnTestFS());
wals = new WALFactory(conf, null, currentTest.getMethodName());
final Set<WAL> seen = new HashSet<WAL>(temp*4);
wals = new WALFactory(CONF, null, "setMembershipDedups");
final Set<WAL> seen = new HashSet<WAL>(temp * 4);
final Random random = new Random();
int count = 0;
// we know that this should see one of the wals more than once
for (int i = 0; i < temp*8; i++) {
for (int i = 0; i < temp * 8; i++) {
final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()), null);
LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
if (seen.add(maybeNewWAL)) {
count++;
}
}
assertEquals("received back a different number of WALs that are not equal() to each other " +
"than the bound we placed.", temp*4, count);
assertEquals("received back a different number of WALs that are not equal() to each other "
+ "than the bound we placed.",
temp * 4, count);
} finally {
if (wals != null) {
wals.close();
}
conf.setInt(NUM_REGION_GROUPS, temp);
CONF.setInt(NUM_REGION_GROUPS, temp);
}
}
}