HDFS-2717. BookKeeper Journal output stream doesn't check addComplete rc. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1342534 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-05-25 08:56:59 +00:00
parent befd45fcb1
commit e767845c36
11 changed files with 716 additions and 228 deletions

View File

@ -54,6 +54,12 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.bookkeeper</groupId> <groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId> <artifactId>bookkeeper-server</artifactId>
@ -64,6 +70,11 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<profile> <profile>

View File

@ -41,6 +41,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
private final long firstTxId; private final long firstTxId;
private final long lastTxId; private final long lastTxId;
private final int logVersion; private final int logVersion;
private final boolean inProgress;
private final LedgerHandle lh; private final LedgerHandle lh;
private final FSEditLogOp.Reader reader; private final FSEditLogOp.Reader reader;
@ -69,6 +70,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
this.firstTxId = metadata.getFirstTxId(); this.firstTxId = metadata.getFirstTxId();
this.lastTxId = metadata.getLastTxId(); this.lastTxId = metadata.getLastTxId();
this.logVersion = metadata.getVersion(); this.logVersion = metadata.getVersion();
this.inProgress = metadata.isInProgress();
BufferedInputStream bin = new BufferedInputStream( BufferedInputStream bin = new BufferedInputStream(
new LedgerInputStream(lh, firstBookKeeperEntry)); new LedgerInputStream(lh, firstBookKeeperEntry));
@ -123,10 +125,28 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
lh.toString(), firstTxId, lastTxId); lh.toString(), firstTxId, lastTxId);
} }
// TODO(HA): Test this.
@Override @Override
public boolean isInProgress() { public boolean isInProgress() {
return true; return inProgress;
}
/**
* Skip forward to specified transaction id.
* Currently we do this by just iterating forward.
* If this proves to be too expensive, this can be reimplemented
* with a binary search over bk entries
*/
public void skipTo(long txId) throws IOException {
long numToSkip = getFirstTxId() - txId;
FSEditLogOp op = null;
for (long i = 0; i < numToSkip; i++) {
op = readOp();
}
if (op != null && op.getTransactionId() != txId-1) {
throw new IOException("Corrupt stream, expected txid "
+ (txId-1) + ", got " + op.getTransactionId());
}
} }
/** /**

View File

@ -233,11 +233,14 @@ public class BookKeeperJournalManager implements JournalManager {
*/ */
l.write(zkc, znodePath); l.write(zkc, znodePath);
maxTxId.store(txId);
return new BookKeeperEditLogOutputStream(conf, currentLedger, wl); return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
} catch (Exception e) { } catch (Exception e) {
if (currentLedger != null) { if (currentLedger != null) {
try { try {
long id = currentLedger.getId();
currentLedger.close(); currentLedger.close();
bkc.deleteLedger(id);
} catch (Exception e2) { } catch (Exception e2) {
//log & ignore, an IOException will be thrown soon //log & ignore, an IOException will be thrown soon
LOG.error("Error closing ledger", e2); LOG.error("Error closing ledger", e2);
@ -313,18 +316,34 @@ public class BookKeeperJournalManager implements JournalManager {
} }
} }
// TODO(HA): Handle inProgressOk EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException { throws IOException {
for (EditLogLedgerMetadata l : getLedgerList()) { for (EditLogLedgerMetadata l : getLedgerList()) {
if (l.getFirstTxId() == fromTxnId) { long lastTxId = l.getLastTxId();
if (l.isInProgress()) {
if (!inProgressOk) {
continue;
}
lastTxId = recoverLastTxId(l, false);
}
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
try { try {
LedgerHandle h = bkc.openLedger(l.getLedgerId(), LedgerHandle h;
BookKeeper.DigestType.MAC, if (l.isInProgress()) { // we don't want to fence the current journal
digestpw.getBytes()); h = bkc.openLedgerNoRecovery(l.getLedgerId(),
return new BookKeeperEditLogInputStream(h, l); BookKeeper.DigestType.MAC, digestpw.getBytes());
} else {
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
digestpw.getBytes());
}
BookKeeperEditLogInputStream s = new BookKeeperEditLogInputStream(h,
l);
s.skipTo(fromTxId);
return s;
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Could not open ledger for " + fromTxnId, e); throw new IOException("Could not open ledger for " + fromTxId, e);
} }
} }
} }
@ -354,26 +373,31 @@ public class BookKeeperJournalManager implements JournalManager {
} }
} }
// TODO(HA): Handle inProgressOk long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException { throws IOException {
long count = 0; long count = 0;
long expectedStart = 0; long expectedStart = 0;
for (EditLogLedgerMetadata l : getLedgerList()) { for (EditLogLedgerMetadata l : getLedgerList()) {
long lastTxId = l.getLastTxId();
if (l.isInProgress()) { if (l.isInProgress()) {
long endTxId = recoverLastTxId(l); if (!inProgressOk) {
if (endTxId == HdfsConstants.INVALID_TXID) { continue;
}
lastTxId = recoverLastTxId(l, false);
if (lastTxId == HdfsConstants.INVALID_TXID) {
break; break;
} }
count += (endTxId - l.getFirstTxId()) + 1;
break;
} }
if (l.getFirstTxId() < fromTxnId) { assert lastTxId >= l.getFirstTxId();
if (lastTxId < fromTxId) {
continue; continue;
} else if (l.getFirstTxId() == fromTxnId) { } else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) {
count = (l.getLastTxId() - l.getFirstTxId()) + 1; // we can start in the middle of a segment
expectedStart = l.getLastTxId() + 1; count = (lastTxId - l.getFirstTxId()) + 1;
expectedStart = lastTxId + 1;
} else { } else {
if (expectedStart != l.getFirstTxId()) { if (expectedStart != l.getFirstTxId()) {
if (count == 0) { if (count == 0) {
@ -384,8 +408,8 @@ public class BookKeeperJournalManager implements JournalManager {
break; break;
} }
} }
count += (l.getLastTxId() - l.getFirstTxId()) + 1; count += (lastTxId - l.getFirstTxId()) + 1;
expectedStart = l.getLastTxId() + 1; expectedStart = lastTxId + 1;
} }
} }
return count; return count;
@ -404,7 +428,7 @@ public class BookKeeperJournalManager implements JournalManager {
String znode = ledgerPath + "/" + child; String znode = ledgerPath + "/" + child;
EditLogLedgerMetadata l EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, znode); = EditLogLedgerMetadata.read(zkc, znode);
long endTxId = recoverLastTxId(l); long endTxId = recoverLastTxId(l, true);
if (endTxId == HdfsConstants.INVALID_TXID) { if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment " LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + znode + l.toString() + " at path " + znode
@ -474,11 +498,19 @@ public class BookKeeperJournalManager implements JournalManager {
* Find the id of the last edit log transaction writen to a edit log * Find the id of the last edit log transaction writen to a edit log
* ledger. * ledger.
*/ */
private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException { private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
throws IOException {
try { try {
LedgerHandle lh = bkc.openLedger(l.getLedgerId(), LedgerHandle lh = null;
BookKeeper.DigestType.MAC, if (fence) {
digestpw.getBytes()); lh = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
digestpw.getBytes());
} else {
lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
BookKeeper.DigestType.MAC,
digestpw.getBytes());
}
long lastAddConfirmed = lh.getLastAddConfirmed(); long lastAddConfirmed = lh.getLastAddConfirmed();
BookKeeperEditLogInputStream in BookKeeperEditLogInputStream in
= new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.LocalBookKeeper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.io.IOException;
import java.io.File;
/**
* Utility class for setting up bookkeeper ensembles
* and bringing individual bookies up and down
*/
class BKJMUtil {
protected static final Log LOG = LogFactory.getLog(BKJMUtil.class);
int nextPort = 6000; // next port for additionally created bookies
private Thread bkthread = null;
private final static String zkEnsemble = "127.0.0.1:2181";
int numBookies;
BKJMUtil(final int numBookies) throws Exception {
this.numBookies = numBookies;
bkthread = new Thread() {
public void run() {
try {
String[] args = new String[1];
args[0] = String.valueOf(numBookies);
LOG.info("Starting bk");
LocalBookKeeper.main(args);
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
LOG.error("Error starting local bk", e);
}
}
};
}
void start() throws Exception {
bkthread.start();
if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
throw new Exception("Error starting zookeeper/bookkeeper");
}
assertEquals("Not all bookies started",
numBookies, checkBookiesUp(numBookies, 10));
}
void teardown() throws Exception {
if (bkthread != null) {
bkthread.interrupt();
bkthread.join();
}
}
static ZooKeeper connectZooKeeper()
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(3, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
static URI createJournalURI(String path) throws Exception {
return URI.create("bookkeeper://" + zkEnsemble + path);
}
static void addJournalManagerDefinition(Configuration conf) {
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".bookkeeper",
"org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager");
}
BookieServer newBookie() throws Exception {
int port = nextPort++;
ServerConfiguration bookieConf = new ServerConfiguration();
bookieConf.setBookiePort(port);
File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
"test");
tmpdir.delete();
tmpdir.mkdir();
bookieConf.setZkServers(zkEnsemble);
bookieConf.setJournalDirName(tmpdir.getPath());
bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
BookieServer b = new BookieServer(bookieConf);
b.start();
for (int i = 0; i < 10 && !b.isRunning(); i++) {
Thread.sleep(10000);
}
if (!b.isRunning()) {
throw new IOException("Bookie would not start");
}
return b;
}
/**
* Check that a number of bookies are available
* @param count number of bookies required
* @param timeout number of seconds to wait for bookies to start
* @throws IOException if bookies are not started by the time the timeout hits
*/
int checkBookiesUp(int count, int timeout) throws Exception {
ZooKeeper zkc = connectZooKeeper();
try {
boolean up = false;
int mostRecentSize = 0;
for (int i = 0; i < timeout; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
mostRecentSize = children.size();
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + mostRecentSize + " bookies up, "
+ "waiting for " + count);
if (LOG.isTraceEnabled()) {
for (String child : children) {
LOG.trace(" server: " + child);
}
}
}
if (mostRecentSize == count) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
return mostRecentSize;
} finally {
zkc.close();
}
}
}

View File

@ -0,0 +1,267 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;
/**
* Integration test to ensure that the BookKeeper JournalManager
* works for HDFS Namenode HA
*/
public class TestBookKeeperAsHASharedDir {
static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
private static BKJMUtil bkutil;
static int numBookies = 3;
private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
@BeforeClass
public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(numBookies);
bkutil.start();
}
@AfterClass
public static void teardownBookkeeper() throws Exception {
bkutil.teardown();
}
/**
* Test simple HA failover usecase with BK
*/
@Test
public void testFailoverWithBK() throws Exception {
Runtime mockRuntime1 = mock(Runtime.class);
Runtime mockRuntime2 = mock(Runtime.class);
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailover").toString());
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
cluster.waitActive();
cluster.transitionToActive(0);
Path p = new Path("/testBKJMfailover");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p);
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
assertTrue(fs.exists(p));
} finally {
verify(mockRuntime1, times(0)).exit(anyInt());
verify(mockRuntime2, times(0)).exit(anyInt());
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test HA failover, where BK, as the shared storage, fails.
* Once it becomes available again, a standby can come up.
* Verify that any write happening after the BK fail is not
* available on the standby.
*/
@Test
public void testFailoverWithFailingBKCluster() throws Exception {
int ensembleSize = numBookies + 1;
BookieServer newBookie = bkutil.newBookie();
assertEquals("New bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
BookieServer replacementBookie = null;
Runtime mockRuntime1 = mock(Runtime.class);
Runtime mockRuntime2 = mock(Runtime.class);
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
cluster.waitActive();
cluster.transitionToActive(0);
Path p1 = new Path("/testBKJMFailingBKCluster1");
Path p2 = new Path("/testBKJMFailingBKCluster2");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p1);
newBookie.shutdown(); // will take down shared storage
assertEquals("New bookie didn't stop",
numBookies, bkutil.checkBookiesUp(numBookies, 10));
// mkdirs will "succeed", but nn have called runtime.exit
fs.mkdirs(p2);
verify(mockRuntime1, atLeastOnce()).exit(anyInt());
verify(mockRuntime2, times(0)).exit(anyInt());
cluster.shutdownNameNode(0);
try {
cluster.transitionToActive(1);
fail("Shouldn't have been able to transition with bookies down");
} catch (ServiceFailedException e) {
assertTrue("Wrong exception",
e.getMessage().contains("Failed to start active services"));
}
verify(mockRuntime2, atLeastOnce()).exit(anyInt());
replacementBookie = bkutil.newBookie();
assertEquals("Replacement bookie didn't start",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
cluster.transitionToActive(1); // should work fine now
assertTrue(fs.exists(p1));
assertFalse(fs.exists(p2));
} finally {
newBookie.shutdown();
if (replacementBookie != null) {
replacementBookie.shutdown();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test that two namenodes can't become primary at the same
* time.
*/
@Test
public void testMultiplePrimariesStarted() throws Exception {
Runtime mockRuntime1 = mock(Runtime.class);
Runtime mockRuntime2 = mock(Runtime.class);
Path p1 = new Path("/testBKJMMultiplePrimary");
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
cluster.waitActive();
cluster.transitionToActive(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p1);
nn1.getRpcServer().rollEditLog();
try {
cluster.transitionToActive(1);
fail("Shouldn't have been able to start two primaries"
+ " with single shared storage");
} catch (ServiceFailedException sfe) {
assertTrue("Wrong exception",
sfe.getMessage().contains("Failed to start active services"));
}
} finally {
verify(mockRuntime1, times(0)).exit(anyInt());
verify(mockRuntime2, atLeastOnce()).exit(anyInt());
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
/**
* Runs the same tests as TestStandbyCheckpoints, but
* using a bookkeeper journal manager as the shared directory
*/
public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
private static BKJMUtil bkutil = null;
static int numBookies = 3;
static int journalCount = 0;
@Override
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/checkpointing" + journalCount++)
.toString());
BKJMUtil.addJournalManagerDefinition(conf);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
cluster.waitActive();
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
fs = HATestUtil.configureFailoverFs(cluster, conf);
cluster.transitionToActive(0);
}
@BeforeClass
public static void startBK() throws Exception {
journalCount = 0;
bkutil = new BKJMUtil(numBookies);
bkutil.start();
}
@AfterClass
public static void shutdownBK() throws Exception {
if (bkutil != null) {
bkutil.teardown();
}
}
@Override
public void testCheckpointCancellation() throws Exception {
// Overriden as the implementation in the superclass assumes that writes
// are to a file. This should be fixed at some point
}
}

View File

@ -18,55 +18,23 @@
package org.apache.hadoop.contrib.bkjournal; package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.net.URI;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.LocalBookKeeper;
import java.io.RandomAccessFile;
import java.io.File;
import java.io.FilenameFilter;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.junit.Test; import org.junit.Test;
import org.junit.Before; import org.junit.Before;
import org.junit.After; import org.junit.After;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import java.io.IOException;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager; import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import com.google.common.collect.ImmutableList;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -75,125 +43,26 @@ public class TestBookKeeperJournalManager {
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class); static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final String zkEnsemble = "localhost:2181";
final static private int numBookies = 5;
private static Thread bkthread;
protected static Configuration conf = new Configuration(); protected static Configuration conf = new Configuration();
private ZooKeeper zkc; private ZooKeeper zkc;
private static BKJMUtil bkutil;
static int numBookies = 3;
static int nextPort = 6000; // next port for additionally created bookies
private static ZooKeeper connectZooKeeper(String ensemble)
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(3, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
private static BookieServer newBookie() throws Exception {
int port = nextPort++;
ServerConfiguration bookieConf = new ServerConfiguration();
bookieConf.setBookiePort(port);
File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
"test");
tmpdir.delete();
tmpdir.mkdir();
bookieConf.setZkServers(zkEnsemble);
bookieConf.setJournalDirName(tmpdir.getPath());
bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
BookieServer b = new BookieServer(bookieConf);
b.start();
for (int i = 0; i < 10 && !b.isRunning(); i++) {
Thread.sleep(10000);
}
if (!b.isRunning()) {
throw new IOException("Bookie would not start");
}
return b;
}
/**
* Check that a number of bookies are available
* @param count number of bookies required
* @param timeout number of seconds to wait for bookies to start
* @throws IOException if bookies are not started by the time the timeout hits
*/
private static int checkBookiesUp(int count, int timeout) throws Exception {
ZooKeeper zkc = connectZooKeeper(zkEnsemble);
try {
boolean up = false;
int mostRecentSize = 0;
for (int i = 0; i < timeout; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
mostRecentSize = children.size();
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + mostRecentSize + " bookies up, "
+ "waiting for " + count);
if (LOG.isTraceEnabled()) {
for (String child : children) {
LOG.trace(" server: " + child);
}
}
}
if (mostRecentSize == count) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
return mostRecentSize;
} finally {
zkc.close();
}
}
@BeforeClass @BeforeClass
public static void setupBookkeeper() throws Exception { public static void setupBookkeeper() throws Exception {
bkthread = new Thread() { bkutil = new BKJMUtil(numBookies);
public void run() { bkutil.start();
try {
String[] args = new String[1];
args[0] = String.valueOf(numBookies);
LOG.info("Starting bk");
LocalBookKeeper.main(args);
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
LOG.error("Error starting local bk", e);
}
}
};
bkthread.start();
if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
throw new Exception("Error starting zookeeper/bookkeeper");
}
assertEquals("Not all bookies started",
numBookies, checkBookiesUp(numBookies, 10));
} }
@AfterClass
public static void teardownBookkeeper() throws Exception {
bkutil.teardown();
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
zkc = connectZooKeeper(zkEnsemble); zkc = BKJMUtil.connectZooKeeper();
} }
@After @After
@ -201,18 +70,10 @@ public class TestBookKeeperJournalManager {
zkc.close(); zkc.close();
} }
@AfterClass
public static void teardownBookkeeper() throws Exception {
if (bkthread != null) {
bkthread.interrupt();
bkthread.join();
}
}
@Test @Test
public void testSimpleWrite() throws Exception { public void testSimpleWrite() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite")); BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
long txid = 1; long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
@ -231,8 +92,8 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testNumberOfTransactions() throws Exception { public void testNumberOfTransactions() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount")); BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
long txid = 1; long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
@ -249,8 +110,8 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testNumberOfTransactionsWithGaps() throws Exception { public void testNumberOfTransactionsWithGaps() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps")); BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
long txid = 1; long txid = 1;
for (long i = 0; i < 3; i++) { for (long i = 0; i < 3; i++) {
long start = txid; long start = txid;
@ -262,9 +123,11 @@ public class TestBookKeeperJournalManager {
} }
out.close(); out.close();
bkjm.finalizeLogSegment(start, txid-1); bkjm.finalizeLogSegment(start, txid-1);
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); assertNotNull(
zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
} }
zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
DEFAULT_SEGMENT_SIZE*2), -1);
long numTrans = bkjm.getNumberOfTransactions(1, true); long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
@ -282,8 +145,8 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd")); BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
long txid = 1; long txid = 1;
for (long i = 0; i < 3; i++) { for (long i = 0; i < 3; i++) {
long start = txid; long start = txid;
@ -296,7 +159,8 @@ public class TestBookKeeperJournalManager {
out.close(); out.close();
bkjm.finalizeLogSegment(start, (txid-1)); bkjm.finalizeLogSegment(start, (txid-1));
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false)); assertNotNull(
zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
} }
long start = txid; long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start); EditLogOutputStream out = bkjm.startLogSegment(start);
@ -320,8 +184,8 @@ public class TestBookKeeperJournalManager {
*/ */
@Test @Test
public void testWriteRestartFrom1() throws Exception { public void testWriteRestartFrom1() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1")); BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
long txid = 1; long txid = 1;
long start = txid; long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
@ -375,10 +239,10 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testTwoWriters() throws Exception { public void testTwoWriters() throws Exception {
long start = 1; long start = 1;
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
EditLogOutputStream out1 = bkjm1.startLogSegment(start); EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try { try {
@ -391,8 +255,8 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testSimpleRead() throws Exception { public void testSimpleRead() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread")); BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
long txid = 1; long txid = 1;
final long numTransactions = 10000; final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
@ -416,8 +280,8 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testSimpleRecovery() throws Exception { public void testSimpleRecovery() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery")); BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
long txid = 1; long txid = 1;
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
@ -448,13 +312,13 @@ public class TestBookKeeperJournalManager {
*/ */
@Test @Test
public void testAllBookieFailure() throws Exception { public void testAllBookieFailure() throws Exception {
BookieServer bookieToFail = newBookie(); BookieServer bookieToFail = bkutil.newBookie();
BookieServer replacementBookie = null; BookieServer replacementBookie = null;
try { try {
int ensembleSize = numBookies + 1; int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start", assertEquals("New bookie didn't start",
ensembleSize, checkBookiesUp(ensembleSize, 10)); ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies, // ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager // so that a failure will fail the journal manager
@ -465,8 +329,7 @@ public class TestBookKeeperJournalManager {
ensembleSize); ensembleSize);
long txid = 1; long txid = 1;
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
+ "/hdfsjournal-allbookiefailure"));
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
@ -478,7 +341,7 @@ public class TestBookKeeperJournalManager {
out.flush(); out.flush();
bookieToFail.shutdown(); bookieToFail.shutdown();
assertEquals("New bookie didn't die", assertEquals("New bookie didn't die",
numBookies, checkBookiesUp(numBookies, 10)); numBookies, bkutil.checkBookiesUp(numBookies, 10));
try { try {
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
@ -494,10 +357,10 @@ public class TestBookKeeperJournalManager {
assertTrue("Invalid exception message", assertTrue("Invalid exception message",
ioe.getMessage().contains("Failed to write to bookkeeper")); ioe.getMessage().contains("Failed to write to bookkeeper"));
} }
replacementBookie = newBookie(); replacementBookie = bkutil.newBookie();
assertEquals("New bookie didn't start", assertEquals("New bookie didn't start",
numBookies+1, checkBookiesUp(numBookies+1, 10)); numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
out = bkjm.startLogSegment(txid); out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -517,7 +380,7 @@ public class TestBookKeeperJournalManager {
} }
bookieToFail.shutdown(); bookieToFail.shutdown();
if (checkBookiesUp(numBookies, 30) != numBookies) { if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors"); LOG.warn("Not all bookies from this test shut down, expect errors");
} }
} }
@ -530,13 +393,13 @@ public class TestBookKeeperJournalManager {
*/ */
@Test @Test
public void testOneBookieFailure() throws Exception { public void testOneBookieFailure() throws Exception {
BookieServer bookieToFail = newBookie(); BookieServer bookieToFail = bkutil.newBookie();
BookieServer replacementBookie = null; BookieServer replacementBookie = null;
try { try {
int ensembleSize = numBookies + 1; int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start", assertEquals("New bookie didn't start",
ensembleSize, checkBookiesUp(ensembleSize, 10)); ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies, // ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager // so that a failure will fail the journal manager
@ -547,8 +410,7 @@ public class TestBookKeeperJournalManager {
ensembleSize); ensembleSize);
long txid = 1; long txid = 1;
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
+ "/hdfsjournal-onebookiefailure"));
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -558,12 +420,12 @@ public class TestBookKeeperJournalManager {
out.setReadyToFlush(); out.setReadyToFlush();
out.flush(); out.flush();
replacementBookie = newBookie(); replacementBookie = bkutil.newBookie();
assertEquals("replacement bookie didn't start", assertEquals("replacement bookie didn't start",
ensembleSize+1, checkBookiesUp(ensembleSize+1, 10)); ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
bookieToFail.shutdown(); bookieToFail.shutdown();
assertEquals("New bookie didn't die", assertEquals("New bookie didn't die",
ensembleSize, checkBookiesUp(ensembleSize, 10)); ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -581,10 +443,10 @@ public class TestBookKeeperJournalManager {
} }
bookieToFail.shutdown(); bookieToFail.shutdown();
if (checkBookiesUp(numBookies, 30) != numBookies) { if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors"); LOG.warn("Not all bookies from this test shut down, expect errors");
} }
} }
} }
} }

View File

@ -36,4 +36,9 @@ public class FSEditLogTestUtil {
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
return (validation.getEndTxId() - in.getFirstTxId()) + 1; return (validation.getEndTxId() - in.getFirstTxId()) + 1;
} }
public static void setRuntimeForEditLog(NameNode nn, Runtime rt) {
nn.setRuntimeForTesting(rt);
nn.getFSImage().getEditLog().setRuntimeForTesting(rt);
}
} }

View File

@ -231,6 +231,10 @@ public class FSEditLog {
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT); DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
journalSet = new JournalSet(minimumRedundantJournals); journalSet = new JournalSet(minimumRedundantJournals);
// set runtime so we can test starting with a faulty or unavailable
// shared directory
this.journalSet.setRuntimeForTesting(runtime);
for (URI u : dirs) { for (URI u : dirs) {
boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf) boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u); .contains(u);
@ -842,7 +846,7 @@ public class FSEditLog {
* Used only by unit tests. * Used only by unit tests.
*/ */
@VisibleForTesting @VisibleForTesting
synchronized void setRuntimeForTesting(Runtime runtime) { synchronized public void setRuntimeForTesting(Runtime runtime) {
this.runtime = runtime; this.runtime = runtime;
this.journalSet.setRuntimeForTesting(runtime); this.journalSet.setRuntimeForTesting(runtime);
} }

View File

@ -133,6 +133,7 @@ public class MiniDFSCluster {
private int numDataNodes = 1; private int numDataNodes = 1;
private boolean format = true; private boolean format = true;
private boolean manageNameDfsDirs = true; private boolean manageNameDfsDirs = true;
private boolean manageNameDfsSharedDirs = true;
private boolean manageDataDfsDirs = true; private boolean manageDataDfsDirs = true;
private StartupOption option = null; private StartupOption option = null;
private String[] racks = null; private String[] racks = null;
@ -187,6 +188,14 @@ public class MiniDFSCluster {
return this; return this;
} }
/**
* Default: true
*/
public Builder manageNameDfsSharedDirs(boolean val) {
this.manageNameDfsSharedDirs = val;
return this;
}
/** /**
* Default: true * Default: true
*/ */
@ -288,6 +297,7 @@ public class MiniDFSCluster {
builder.numDataNodes, builder.numDataNodes,
builder.format, builder.format,
builder.manageNameDfsDirs, builder.manageNameDfsDirs,
builder.manageNameDfsSharedDirs,
builder.manageDataDfsDirs, builder.manageDataDfsDirs,
builder.option, builder.option,
builder.racks, builder.racks,
@ -527,7 +537,7 @@ public class MiniDFSCluster {
long[] simulatedCapacities) throws IOException { long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(conf, numDataNodes, format, initMiniDFSCluster(conf, numDataNodes, format,
manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts, manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts,
simulatedCapacities, null, true, false, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0)); MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0));
} }
@ -535,7 +545,8 @@ public class MiniDFSCluster {
private void initMiniDFSCluster( private void initMiniDFSCluster(
Configuration conf, Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs, int numDataNodes, boolean format, boolean manageNameDfsDirs,
boolean manageDataDfsDirs, StartupOption operation, String[] racks, boolean manageNameDfsSharedDirs, boolean manageDataDfsDirs,
StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId, String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile, boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology) MiniDFSNNTopology nnTopology)
@ -574,7 +585,8 @@ public class MiniDFSCluster {
federation = nnTopology.isFederated(); federation = nnTopology.isFederated();
createNameNodesAndSetConf( createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, format, operation, clusterId, conf); nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
format, operation, clusterId, conf);
if (format) { if (format) {
if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) { if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
@ -595,8 +607,8 @@ public class MiniDFSCluster {
} }
private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
boolean manageNameDfsDirs, boolean format, StartupOption operation, boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
String clusterId, boolean format, StartupOption operation, String clusterId,
Configuration conf) throws IOException { Configuration conf) throws IOException {
Preconditions.checkArgument(nnTopology.countNameNodes() > 0, Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
"empty NN topology: no namenodes specified!"); "empty NN topology: no namenodes specified!");
@ -641,7 +653,7 @@ public class MiniDFSCluster {
if (nnIds.size() > 1) { if (nnIds.size() > 1) {
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()),
Joiner.on(",").join(nnIds)); Joiner.on(",").join(nnIds));
if (manageNameDfsDirs) { if (manageNameDfsSharedDirs) {
URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1);
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString()); conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
} }

View File

@ -54,9 +54,9 @@ import com.google.common.collect.Lists;
public class TestStandbyCheckpoints { public class TestStandbyCheckpoints {
private static final int NUM_DIRS_IN_LOG = 200000; private static final int NUM_DIRS_IN_LOG = 200000;
private MiniDFSCluster cluster; protected MiniDFSCluster cluster;
private NameNode nn0, nn1; protected NameNode nn0, nn1;
private FileSystem fs; protected FileSystem fs;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Before @Before