Revert HDFS-3058

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1342494 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-05-25 03:57:56 +00:00
parent 8341fb21c1
commit 251c159d93
12 changed files with 210 additions and 680 deletions

View File

@ -113,9 +113,6 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3460. HttpFS proxyuser validation with Kerberos ON uses full HDFS-3460. HttpFS proxyuser validation with Kerberos ON uses full
principal name. (tucu) principal name. (tucu)
HDFS-3058. HA: Bring BookKeeperJournalManager up to date with HA changes.
(Ivan Kelly via umamahesh)
Release 2.0.0-alpha - UNRELEASED Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -54,12 +54,6 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.bookkeeper</groupId> <groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId> <artifactId>bookkeeper-server</artifactId>
@ -70,11 +64,6 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<profile> <profile>

View File

@ -41,7 +41,6 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
private final long firstTxId; private final long firstTxId;
private final long lastTxId; private final long lastTxId;
private final int logVersion; private final int logVersion;
private final boolean inProgress;
private final LedgerHandle lh; private final LedgerHandle lh;
private final FSEditLogOp.Reader reader; private final FSEditLogOp.Reader reader;
@ -70,7 +69,6 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
this.firstTxId = metadata.getFirstTxId(); this.firstTxId = metadata.getFirstTxId();
this.lastTxId = metadata.getLastTxId(); this.lastTxId = metadata.getLastTxId();
this.logVersion = metadata.getVersion(); this.logVersion = metadata.getVersion();
this.inProgress = metadata.isInProgress();
BufferedInputStream bin = new BufferedInputStream( BufferedInputStream bin = new BufferedInputStream(
new LedgerInputStream(lh, firstBookKeeperEntry)); new LedgerInputStream(lh, firstBookKeeperEntry));
@ -125,28 +123,10 @@ public String getName() {
lh.toString(), firstTxId, lastTxId); lh.toString(), firstTxId, lastTxId);
} }
// TODO(HA): Test this.
@Override @Override
public boolean isInProgress() { public boolean isInProgress() {
return inProgress; return true;
}
/**
* Skip forward to specified transaction id.
* Currently we do this by just iterating forward.
* If this proves to be too expensive, this can be reimplemented
* with a binary search over bk entries
*/
public void skipTo(long txId) throws IOException {
long numToSkip = getFirstTxId() - txId;
FSEditLogOp op = null;
for (long i = 0; i < numToSkip; i++) {
op = readOp();
}
if (op != null && op.getTransactionId() != txId-1) {
throw new IOException("Corrupt stream, expected txid "
+ (txId-1) + ", got " + op.getTransactionId());
}
} }
/** /**

View File

@ -233,14 +233,11 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException {
*/ */
l.write(zkc, znodePath); l.write(zkc, znodePath);
maxTxId.store(txId);
return new BookKeeperEditLogOutputStream(conf, currentLedger, wl); return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
} catch (Exception e) { } catch (Exception e) {
if (currentLedger != null) { if (currentLedger != null) {
try { try {
long id = currentLedger.getId();
currentLedger.close(); currentLedger.close();
bkc.deleteLedger(id);
} catch (Exception e2) { } catch (Exception e2) {
//log & ignore, an IOException will be thrown soon //log & ignore, an IOException will be thrown soon
LOG.error("Error closing ledger", e2); LOG.error("Error closing ledger", e2);
@ -387,8 +384,8 @@ long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
break; break;
} }
} }
count += (lastTxId - l.getFirstTxId()) + 1; count += (l.getLastTxId() - l.getFirstTxId()) + 1;
expectedStart = lastTxId + 1; expectedStart = l.getLastTxId() + 1;
} }
} }
return count; return count;
@ -407,7 +404,7 @@ public void recoverUnfinalizedSegments() throws IOException {
String znode = ledgerPath + "/" + child; String znode = ledgerPath + "/" + child;
EditLogLedgerMetadata l EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, znode); = EditLogLedgerMetadata.read(zkc, znode);
long endTxId = recoverLastTxId(l, true); long endTxId = recoverLastTxId(l);
if (endTxId == HdfsConstants.INVALID_TXID) { if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment " LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + znode + l.toString() + " at path " + znode
@ -477,19 +474,11 @@ public void setOutputBufferCapacity(int size) {
* Find the id of the last edit log transaction writen to a edit log * Find the id of the last edit log transaction writen to a edit log
* ledger. * ledger.
*/ */
private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException {
throws IOException {
try { try {
LedgerHandle lh = null; LedgerHandle lh = bkc.openLedger(l.getLedgerId(),
if (fence) { BookKeeper.DigestType.MAC,
lh = bkc.openLedger(l.getLedgerId(), digestpw.getBytes());
BookKeeper.DigestType.MAC,
digestpw.getBytes());
} else {
lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
BookKeeper.DigestType.MAC,
digestpw.getBytes());
}
long lastAddConfirmed = lh.getLastAddConfirmed(); long lastAddConfirmed = lh.getLastAddConfirmed();
BookKeeperEditLogInputStream in BookKeeperEditLogInputStream in
= new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);

View File

@ -1,182 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.LocalBookKeeper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.io.IOException;
import java.io.File;
/**
* Utility class for setting up bookkeeper ensembles
* and bringing individual bookies up and down
*/
class BKJMUtil {
protected static final Log LOG = LogFactory.getLog(BKJMUtil.class);
int nextPort = 6000; // next port for additionally created bookies
private Thread bkthread = null;
private final static String zkEnsemble = "127.0.0.1:2181";
int numBookies;
BKJMUtil(final int numBookies) throws Exception {
this.numBookies = numBookies;
bkthread = new Thread() {
public void run() {
try {
String[] args = new String[1];
args[0] = String.valueOf(numBookies);
LOG.info("Starting bk");
LocalBookKeeper.main(args);
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
LOG.error("Error starting local bk", e);
}
}
};
}
void start() throws Exception {
bkthread.start();
if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
throw new Exception("Error starting zookeeper/bookkeeper");
}
assertEquals("Not all bookies started",
numBookies, checkBookiesUp(numBookies, 10));
}
void teardown() throws Exception {
if (bkthread != null) {
bkthread.interrupt();
bkthread.join();
}
}
static ZooKeeper connectZooKeeper()
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(3, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
static URI createJournalURI(String path) throws Exception {
return URI.create("bookkeeper://" + zkEnsemble + path);
}
static void addJournalManagerDefinition(Configuration conf) {
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".bookkeeper",
"org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager");
}
BookieServer newBookie() throws Exception {
int port = nextPort++;
ServerConfiguration bookieConf = new ServerConfiguration();
bookieConf.setBookiePort(port);
File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
"test");
tmpdir.delete();
tmpdir.mkdir();
bookieConf.setZkServers(zkEnsemble);
bookieConf.setJournalDirName(tmpdir.getPath());
bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
BookieServer b = new BookieServer(bookieConf);
b.start();
for (int i = 0; i < 10 && !b.isRunning(); i++) {
Thread.sleep(10000);
}
if (!b.isRunning()) {
throw new IOException("Bookie would not start");
}
return b;
}
/**
* Check that a number of bookies are available
* @param count number of bookies required
* @param timeout number of seconds to wait for bookies to start
* @throws IOException if bookies are not started by the time the timeout hits
*/
int checkBookiesUp(int count, int timeout) throws Exception {
ZooKeeper zkc = connectZooKeeper();
try {
boolean up = false;
int mostRecentSize = 0;
for (int i = 0; i < timeout; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
mostRecentSize = children.size();
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + mostRecentSize + " bookies up, "
+ "waiting for " + count);
if (LOG.isTraceEnabled()) {
for (String child : children) {
LOG.trace(" server: " + child);
}
}
}
if (mostRecentSize == count) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
return mostRecentSize;
} finally {
zkc.close();
}
}
}

View File

@ -1,267 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;
/**
* Integration test to ensure that the BookKeeper JournalManager
* works for HDFS Namenode HA
*/
public class TestBookKeeperAsHASharedDir {
static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
private static BKJMUtil bkutil;
static int numBookies = 3;
private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
@BeforeClass
public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(numBookies);
bkutil.start();
}
@AfterClass
public static void teardownBookkeeper() throws Exception {
bkutil.teardown();
}
/**
* Test simple HA failover usecase with BK
*/
@Test
public void testFailoverWithBK() throws Exception {
Runtime mockRuntime1 = mock(Runtime.class);
Runtime mockRuntime2 = mock(Runtime.class);
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailover").toString());
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
cluster.waitActive();
cluster.transitionToActive(0);
Path p = new Path("/testBKJMfailover");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p);
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
assertTrue(fs.exists(p));
} finally {
verify(mockRuntime1, times(0)).exit(anyInt());
verify(mockRuntime2, times(0)).exit(anyInt());
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test HA failover, where BK, as the shared storage, fails.
* Once it becomes available again, a standby can come up.
* Verify that any write happening after the BK fail is not
* available on the standby.
*/
@Test
public void testFailoverWithFailingBKCluster() throws Exception {
int ensembleSize = numBookies + 1;
BookieServer newBookie = bkutil.newBookie();
assertEquals("New bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
BookieServer replacementBookie = null;
Runtime mockRuntime1 = mock(Runtime.class);
Runtime mockRuntime2 = mock(Runtime.class);
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
cluster.waitActive();
cluster.transitionToActive(0);
Path p1 = new Path("/testBKJMFailingBKCluster1");
Path p2 = new Path("/testBKJMFailingBKCluster2");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p1);
newBookie.shutdown(); // will take down shared storage
assertEquals("New bookie didn't stop",
numBookies, bkutil.checkBookiesUp(numBookies, 10));
// mkdirs will "succeed", but nn have called runtime.exit
fs.mkdirs(p2);
verify(mockRuntime1, atLeastOnce()).exit(anyInt());
verify(mockRuntime2, times(0)).exit(anyInt());
cluster.shutdownNameNode(0);
try {
cluster.transitionToActive(1);
fail("Shouldn't have been able to transition with bookies down");
} catch (ServiceFailedException e) {
assertTrue("Wrong exception",
e.getMessage().contains("Failed to start active services"));
}
verify(mockRuntime2, atLeastOnce()).exit(anyInt());
replacementBookie = bkutil.newBookie();
assertEquals("Replacement bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
cluster.transitionToActive(1); // should work fine now
assertTrue(fs.exists(p1));
assertFalse(fs.exists(p2));
} finally {
newBookie.shutdown();
if (replacementBookie != null) {
replacementBookie.shutdown();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test that two namenodes can't become primary at the same
* time.
*/
@Test
public void testMultiplePrimariesStarted() throws Exception {
Runtime mockRuntime1 = mock(Runtime.class);
Runtime mockRuntime2 = mock(Runtime.class);
Path p1 = new Path("/testBKJMMultiplePrimary");
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
cluster.waitActive();
cluster.transitionToActive(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p1);
nn1.getRpcServer().rollEditLog();
try {
cluster.transitionToActive(1);
fail("Shouldn't have been able to start two primaries"
+ " with single shared storage");
} catch (ServiceFailedException sfe) {
assertTrue("Wrong exception",
sfe.getMessage().contains("Failed to start active services"));
}
} finally {
verify(mockRuntime1, times(0)).exit(anyInt());
verify(mockRuntime2, atLeastOnce()).exit(anyInt());
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -1,93 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
/**
* Runs the same tests as TestStandbyCheckpoints, but
* using a bookkeeper journal manager as the shared directory
*/
public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
private static BKJMUtil bkutil = null;
static int numBookies = 3;
static int journalCount = 0;
@Override
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/checkpointing" + journalCount++)
.toString());
BKJMUtil.addJournalManagerDefinition(conf);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
cluster.waitActive();
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
fs = HATestUtil.configureFailoverFs(cluster, conf);
cluster.transitionToActive(0);
}
@BeforeClass
public static void startBK() throws Exception {
journalCount = 0;
bkutil = new BKJMUtil(numBookies);
bkutil.start();
}
@AfterClass
public static void shutdownBK() throws Exception {
if (bkutil != null) {
bkutil.teardown();
}
}
@Override
public void testCheckpointCancellation() throws Exception {
// Overriden as the implementation in the superclass assumes that writes
// are to a file. This should be fixed at some point
}
}

View File

@ -18,23 +18,55 @@
package org.apache.hadoop.contrib.bkjournal; package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.net.URI;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.LocalBookKeeper;
import java.io.RandomAccessFile;
import java.io.File;
import java.io.FilenameFilter;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.junit.Test; import org.junit.Test;
import org.junit.Before; import org.junit.Before;
import org.junit.After; import org.junit.After;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.AfterClass; import org.junit.AfterClass;
import java.io.IOException; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.conf.Configuration; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager; import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import com.google.common.collect.ImmutableList;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -43,26 +75,125 @@ public class TestBookKeeperJournalManager {
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class); static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final String zkEnsemble = "localhost:2181";
final static private int numBookies = 5;
private static Thread bkthread;
protected static Configuration conf = new Configuration(); protected static Configuration conf = new Configuration();
private ZooKeeper zkc; private ZooKeeper zkc;
private static BKJMUtil bkutil;
static int numBookies = 3;
static int nextPort = 6000; // next port for additionally created bookies
private static ZooKeeper connectZooKeeper(String ensemble)
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(3, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
private static BookieServer newBookie() throws Exception {
int port = nextPort++;
ServerConfiguration bookieConf = new ServerConfiguration();
bookieConf.setBookiePort(port);
File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
"test");
tmpdir.delete();
tmpdir.mkdir();
bookieConf.setZkServers(zkEnsemble);
bookieConf.setJournalDirName(tmpdir.getPath());
bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
BookieServer b = new BookieServer(bookieConf);
b.start();
for (int i = 0; i < 10 && !b.isRunning(); i++) {
Thread.sleep(10000);
}
if (!b.isRunning()) {
throw new IOException("Bookie would not start");
}
return b;
}
/**
* Check that a number of bookies are available
* @param count number of bookies required
* @param timeout number of seconds to wait for bookies to start
* @throws IOException if bookies are not started by the time the timeout hits
*/
private static int checkBookiesUp(int count, int timeout) throws Exception {
ZooKeeper zkc = connectZooKeeper(zkEnsemble);
try {
boolean up = false;
int mostRecentSize = 0;
for (int i = 0; i < timeout; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
mostRecentSize = children.size();
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + mostRecentSize + " bookies up, "
+ "waiting for " + count);
if (LOG.isTraceEnabled()) {
for (String child : children) {
LOG.trace(" server: " + child);
}
}
}
if (mostRecentSize == count) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
return mostRecentSize;
} finally {
zkc.close();
}
}
@BeforeClass @BeforeClass
public static void setupBookkeeper() throws Exception { public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(numBookies); bkthread = new Thread() {
bkutil.start(); public void run() {
} try {
String[] args = new String[1];
args[0] = String.valueOf(numBookies);
LOG.info("Starting bk");
LocalBookKeeper.main(args);
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
LOG.error("Error starting local bk", e);
}
}
};
bkthread.start();
@AfterClass if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
public static void teardownBookkeeper() throws Exception { throw new Exception("Error starting zookeeper/bookkeeper");
bkutil.teardown(); }
assertEquals("Not all bookies started",
numBookies, checkBookiesUp(numBookies, 10));
} }
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
zkc = BKJMUtil.connectZooKeeper(); zkc = connectZooKeeper(zkEnsemble);
} }
@After @After
@ -70,10 +201,18 @@ public void teardown() throws Exception {
zkc.close(); zkc.close();
} }
@AfterClass
public static void teardownBookkeeper() throws Exception {
if (bkthread != null) {
bkthread.interrupt();
bkthread.join();
}
}
@Test @Test
public void testSimpleWrite() throws Exception { public void testSimpleWrite() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
long txid = 1; long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
@ -93,7 +232,7 @@ public void testSimpleWrite() throws Exception {
@Test @Test
public void testNumberOfTransactions() throws Exception { public void testNumberOfTransactions() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-txncount")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
long txid = 1; long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
@ -111,7 +250,7 @@ public void testNumberOfTransactions() throws Exception {
@Test @Test
public void testNumberOfTransactionsWithGaps() throws Exception { public void testNumberOfTransactionsWithGaps() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-gaps")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
long txid = 1; long txid = 1;
for (long i = 0; i < 3; i++) { for (long i = 0; i < 3; i++) {
long start = txid; long start = txid;
@ -123,11 +262,9 @@ public void testNumberOfTransactionsWithGaps() throws Exception {
} }
out.close(); out.close();
bkjm.finalizeLogSegment(start, txid-1); bkjm.finalizeLogSegment(start, txid-1);
assertNotNull( assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
} }
zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
DEFAULT_SEGMENT_SIZE*2), -1);
long numTrans = bkjm.getNumberOfTransactions(1, true); long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
@ -146,7 +283,7 @@ public void testNumberOfTransactionsWithGaps() throws Exception {
@Test @Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
long txid = 1; long txid = 1;
for (long i = 0; i < 3; i++) { for (long i = 0; i < 3; i++) {
long start = txid; long start = txid;
@ -159,8 +296,7 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
out.close(); out.close();
bkjm.finalizeLogSegment(start, (txid-1)); bkjm.finalizeLogSegment(start, (txid-1));
assertNotNull( assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
} }
long start = txid; long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start); EditLogOutputStream out = bkjm.startLogSegment(start);
@ -185,7 +321,7 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
@Test @Test
public void testWriteRestartFrom1() throws Exception { public void testWriteRestartFrom1() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
long txid = 1; long txid = 1;
long start = txid; long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
@ -240,9 +376,9 @@ public void testWriteRestartFrom1() throws Exception {
public void testTwoWriters() throws Exception { public void testTwoWriters() throws Exception {
long start = 1; long start = 1;
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
EditLogOutputStream out1 = bkjm1.startLogSegment(start); EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try { try {
@ -256,7 +392,7 @@ public void testTwoWriters() throws Exception {
@Test @Test
public void testSimpleRead() throws Exception { public void testSimpleRead() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simpleread")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
long txid = 1; long txid = 1;
final long numTransactions = 10000; final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
@ -281,7 +417,7 @@ public void testSimpleRead() throws Exception {
@Test @Test
public void testSimpleRecovery() throws Exception { public void testSimpleRecovery() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery")); URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
long txid = 1; long txid = 1;
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
@ -312,13 +448,13 @@ public void testSimpleRecovery() throws Exception {
*/ */
@Test @Test
public void testAllBookieFailure() throws Exception { public void testAllBookieFailure() throws Exception {
BookieServer bookieToFail = bkutil.newBookie(); BookieServer bookieToFail = newBookie();
BookieServer replacementBookie = null; BookieServer replacementBookie = null;
try { try {
int ensembleSize = numBookies + 1; int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start", assertEquals("New bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); ensembleSize, checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies, // ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager // so that a failure will fail the journal manager
@ -329,7 +465,8 @@ public void testAllBookieFailure() throws Exception {
ensembleSize); ensembleSize);
long txid = 1; long txid = 1;
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure")); URI.create("bookkeeper://" + zkEnsemble
+ "/hdfsjournal-allbookiefailure"));
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
@ -341,7 +478,7 @@ public void testAllBookieFailure() throws Exception {
out.flush(); out.flush();
bookieToFail.shutdown(); bookieToFail.shutdown();
assertEquals("New bookie didn't die", assertEquals("New bookie didn't die",
numBookies, bkutil.checkBookiesUp(numBookies, 10)); numBookies, checkBookiesUp(numBookies, 10));
try { try {
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
@ -357,10 +494,10 @@ public void testAllBookieFailure() throws Exception {
assertTrue("Invalid exception message", assertTrue("Invalid exception message",
ioe.getMessage().contains("Failed to write to bookkeeper")); ioe.getMessage().contains("Failed to write to bookkeeper"));
} }
replacementBookie = bkutil.newBookie(); replacementBookie = newBookie();
assertEquals("New bookie didn't start", assertEquals("New bookie didn't start",
numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10)); numBookies+1, checkBookiesUp(numBookies+1, 10));
out = bkjm.startLogSegment(txid); out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -380,7 +517,7 @@ public void testAllBookieFailure() throws Exception {
} }
bookieToFail.shutdown(); bookieToFail.shutdown();
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { if (checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors"); LOG.warn("Not all bookies from this test shut down, expect errors");
} }
} }
@ -393,13 +530,13 @@ public void testAllBookieFailure() throws Exception {
*/ */
@Test @Test
public void testOneBookieFailure() throws Exception { public void testOneBookieFailure() throws Exception {
BookieServer bookieToFail = bkutil.newBookie(); BookieServer bookieToFail = newBookie();
BookieServer replacementBookie = null; BookieServer replacementBookie = null;
try { try {
int ensembleSize = numBookies + 1; int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start", assertEquals("New bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); ensembleSize, checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies, // ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager // so that a failure will fail the journal manager
@ -410,7 +547,8 @@ public void testOneBookieFailure() throws Exception {
ensembleSize); ensembleSize);
long txid = 1; long txid = 1;
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure")); URI.create("bookkeeper://" + zkEnsemble
+ "/hdfsjournal-onebookiefailure"));
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -420,12 +558,12 @@ public void testOneBookieFailure() throws Exception {
out.setReadyToFlush(); out.setReadyToFlush();
out.flush(); out.flush();
replacementBookie = bkutil.newBookie(); replacementBookie = newBookie();
assertEquals("replacement bookie didn't start", assertEquals("replacement bookie didn't start",
ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10)); ensembleSize+1, checkBookiesUp(ensembleSize+1, 10));
bookieToFail.shutdown(); bookieToFail.shutdown();
assertEquals("New bookie didn't die", assertEquals("New bookie didn't die",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); ensembleSize, checkBookiesUp(ensembleSize, 10));
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -443,7 +581,7 @@ public void testOneBookieFailure() throws Exception {
} }
bookieToFail.shutdown(); bookieToFail.shutdown();
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { if (checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors"); LOG.warn("Not all bookies from this test shut down, expect errors");
} }
} }

View File

@ -36,9 +36,4 @@ public static long countTransactionsInStream(EditLogInputStream in)
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
return (validation.getEndTxId() - in.getFirstTxId()) + 1; return (validation.getEndTxId() - in.getFirstTxId()) + 1;
} }
public static void setRuntimeForEditLog(NameNode nn, Runtime rt) {
nn.setRuntimeForTesting(rt);
nn.getFSImage().getEditLog().setRuntimeForTesting(rt);
}
} }

View File

@ -210,10 +210,6 @@ private synchronized void initJournals(List<URI> dirs) {
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT); DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
journalSet = new JournalSet(minimumRedundantJournals); journalSet = new JournalSet(minimumRedundantJournals);
// set runtime so we can test starting with a faulty or unavailable
// shared directory
this.journalSet.setRuntimeForTesting(runtime);
for (URI u : dirs) { for (URI u : dirs) {
boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf) boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u); .contains(u);
@ -825,7 +821,7 @@ synchronized public JournalSet getJournalSet() {
* Used only by unit tests. * Used only by unit tests.
*/ */
@VisibleForTesting @VisibleForTesting
synchronized public void setRuntimeForTesting(Runtime runtime) { synchronized void setRuntimeForTesting(Runtime runtime) {
this.runtime = runtime; this.runtime = runtime;
this.journalSet.setRuntimeForTesting(runtime); this.journalSet.setRuntimeForTesting(runtime);
} }

View File

@ -131,7 +131,6 @@ public static class Builder {
private int numDataNodes = 1; private int numDataNodes = 1;
private boolean format = true; private boolean format = true;
private boolean manageNameDfsDirs = true; private boolean manageNameDfsDirs = true;
private boolean manageNameDfsSharedDirs = true;
private boolean manageDataDfsDirs = true; private boolean manageDataDfsDirs = true;
private StartupOption option = null; private StartupOption option = null;
private String[] racks = null; private String[] racks = null;
@ -186,14 +185,6 @@ public Builder manageNameDfsDirs(boolean val) {
return this; return this;
} }
/**
* Default: true
*/
public Builder manageNameDfsSharedDirs(boolean val) {
this.manageNameDfsSharedDirs = val;
return this;
}
/** /**
* Default: true * Default: true
*/ */
@ -295,7 +286,6 @@ private MiniDFSCluster(Builder builder) throws IOException {
builder.numDataNodes, builder.numDataNodes,
builder.format, builder.format,
builder.manageNameDfsDirs, builder.manageNameDfsDirs,
builder.manageNameDfsSharedDirs,
builder.manageDataDfsDirs, builder.manageDataDfsDirs,
builder.option, builder.option,
builder.racks, builder.racks,
@ -535,7 +525,7 @@ public MiniDFSCluster(int nameNodePort,
long[] simulatedCapacities) throws IOException { long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(conf, numDataNodes, format, initMiniDFSCluster(conf, numDataNodes, format,
manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts, manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
simulatedCapacities, null, true, false, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0)); MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0));
} }
@ -543,8 +533,7 @@ public MiniDFSCluster(int nameNodePort,
private void initMiniDFSCluster( private void initMiniDFSCluster(
Configuration conf, Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs, int numDataNodes, boolean format, boolean manageNameDfsDirs,
boolean manageNameDfsSharedDirs, boolean manageDataDfsDirs, boolean manageDataDfsDirs, StartupOption operation, String[] racks,
StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId, String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile, boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology) MiniDFSNNTopology nnTopology)
@ -583,8 +572,7 @@ private void initMiniDFSCluster(
federation = nnTopology.isFederated(); federation = nnTopology.isFederated();
createNameNodesAndSetConf( createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs, nnTopology, manageNameDfsDirs, format, operation, clusterId, conf);
format, operation, clusterId, conf);
if (format) { if (format) {
if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) { if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
@ -605,8 +593,8 @@ private void initMiniDFSCluster(
} }
private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs, boolean manageNameDfsDirs, boolean format, StartupOption operation,
boolean format, StartupOption operation, String clusterId, String clusterId,
Configuration conf) throws IOException { Configuration conf) throws IOException {
Preconditions.checkArgument(nnTopology.countNameNodes() > 0, Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
"empty NN topology: no namenodes specified!"); "empty NN topology: no namenodes specified!");
@ -651,7 +639,7 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
if (nnIds.size() > 1) { if (nnIds.size() > 1) {
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()),
Joiner.on(",").join(nnIds)); Joiner.on(",").join(nnIds));
if (manageNameDfsSharedDirs) { if (manageNameDfsDirs) {
URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1);
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString()); conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
} }

View File

@ -54,9 +54,9 @@
public class TestStandbyCheckpoints { public class TestStandbyCheckpoints {
private static final int NUM_DIRS_IN_LOG = 200000; private static final int NUM_DIRS_IN_LOG = 200000;
protected MiniDFSCluster cluster; private MiniDFSCluster cluster;
protected NameNode nn0, nn1; private NameNode nn0, nn1;
protected FileSystem fs; private FileSystem fs;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Before @Before