HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh)

This commit is contained in:
Uma Maheswara Rao G 2015-02-17 21:28:49 +05:30
parent f24a56787a
commit f0412de1c1
3 changed files with 163 additions and 7 deletions

View File

@ -639,6 +639,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7797. Add audit log for setQuota operation (Rakesh R via umamahesh)
HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -152,6 +152,13 @@ public class BookKeeperJournalManager implements JournalManager {
= "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE
= "dfs.namenode.bookkeeperjournal.ack.quorum-size";
public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC
= "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec";
public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5;
private ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
@ -162,6 +169,8 @@ public class BookKeeperJournalManager implements JournalManager {
private final MaxTxId maxTxId;
private final int ensembleSize;
private final int quorumSize;
private final int ackQuorumSize;
private final int addEntryTimeout;
private final String digestpw;
private final int speculativeReadTimeout;
private final int readEntryTimeout;
@ -184,6 +193,9 @@ public class BookKeeperJournalManager implements JournalManager {
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT);
speculativeReadTimeout = conf.getInt(
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
@ -216,6 +228,7 @@ public class BookKeeperJournalManager implements JournalManager {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
clientConf.setReadEntryTimeout(readEntryTimeout);
clientConf.setAddEntryTimeout(addEntryTimeout);
bkc = new BookKeeper(clientConf, zkc);
} catch (KeeperException e) {
throw new IOException("Error initializing zk", e);
@ -403,7 +416,7 @@ public class BookKeeperJournalManager implements JournalManager {
// bookkeeper errored on last stream, clean up ledger
currentLedger.close();
}
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize,
BookKeeper.DigestType.MAC,
digestpw.getBytes(Charsets.UTF_8));
} catch (BKException bke) {

View File

@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
@ -67,6 +68,7 @@ public class TestBookKeeperJournalManager {
private ZooKeeper zkc;
private static BKJMUtil bkutil;
static int numBookies = 3;
private BookieServer newBookie;
@BeforeClass
public static void setupBookkeeper() throws Exception {
@ -87,6 +89,9 @@ public class TestBookKeeperJournalManager {
@After
public void teardown() throws Exception {
zkc.close();
if (newBookie != null) {
newBookie.shutdown();
}
}
private NamespaceInfo newNSInfo() {
@ -377,7 +382,8 @@ public class TestBookKeeperJournalManager {
*/
@Test
public void testAllBookieFailure() throws Exception {
BookieServer bookieToFail = bkutil.newBookie();
// bookie to fail
newBookie = bkutil.newBookie();
BookieServer replacementBookie = null;
try {
@ -408,7 +414,7 @@ public class TestBookKeeperJournalManager {
}
out.setReadyToFlush();
out.flush();
bookieToFail.shutdown();
newBookie.shutdown();
assertEquals("New bookie didn't die",
numBookies, bkutil.checkBookiesUp(numBookies, 10));
@ -449,7 +455,7 @@ public class TestBookKeeperJournalManager {
if (replacementBookie != null) {
replacementBookie.shutdown();
}
bookieToFail.shutdown();
newBookie.shutdown();
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors");
@ -464,7 +470,7 @@ public class TestBookKeeperJournalManager {
*/
@Test
public void testOneBookieFailure() throws Exception {
BookieServer bookieToFail = bkutil.newBookie();
newBookie = bkutil.newBookie();
BookieServer replacementBookie = null;
try {
@ -500,7 +506,7 @@ public class TestBookKeeperJournalManager {
replacementBookie = bkutil.newBookie();
assertEquals("replacement bookie didn't start",
ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
bookieToFail.shutdown();
newBookie.shutdown();
assertEquals("New bookie didn't die",
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
@ -518,7 +524,7 @@ public class TestBookKeeperJournalManager {
if (replacementBookie != null) {
replacementBookie.shutdown();
}
bookieToFail.shutdown();
newBookie.shutdown();
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors");
@ -822,6 +828,141 @@ public class TestBookKeeperJournalManager {
assertTrue("No thread managed to complete formatting", numCompleted > 0);
}
@Test(timeout = 120000)
public void testDefaultAckQuorum() throws Exception {
newBookie = bkutil.newBookie();
int ensembleSize = numBookies + 1;
int quorumSize = numBookies + 1;
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
quorumSize);
// sets 2 secs
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
2);
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
bkjm.format(nsi);
CountDownLatch sleepLatch = new CountDownLatch(1);
sleepBookie(sleepLatch, newBookie);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
int numTransactions = 100;
for (long i = 1; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
try {
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
bkjm.selectInputStreams(in, 1, true);
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
} finally {
in.get(0).close();
}
fail("Should throw exception as not enough non-faulty bookies available!");
} catch (IOException ioe) {
// expected
}
}
/**
* Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie
* alive and sleep all the other bookies. Now the client would wait for the
* acknowledgement from the ack size bookies and after receiving the success
* response will continue writing. Non ack client will hang long time to add
* entries.
*/
@Test(timeout = 120000)
public void testAckQuorum() throws Exception {
// slow bookie
newBookie = bkutil.newBookie();
// make quorum size and ensemble size same to avoid the interleave writing
// of the ledger entries
int ensembleSize = numBookies + 1;
int quorumSize = numBookies + 1;
int ackSize = numBookies;
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
quorumSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE,
ackSize);
// sets 60 minutes
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
3600);
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
bkjm.format(nsi);
CountDownLatch sleepLatch = new CountDownLatch(1);
sleepBookie(sleepLatch, newBookie);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
int numTransactions = 100;
for (long i = 1; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
bkjm.selectInputStreams(in, 1, true);
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
} finally {
sleepLatch.countDown();
in.get(0).close();
bkjm.close();
}
}
/**
* Sleep a bookie until I count down the latch
*
* @param latch
* Latch to wait on
* @param bookie
* bookie server
* @throws Exception
*/
private void sleepBookie(final CountDownLatch l, final BookieServer bookie)
throws Exception {
Thread sleeper = new Thread() {
public void run() {
try {
bookie.suspendProcessing();
l.await(60, TimeUnit.SECONDS);
bookie.resumeProcessing();
} catch (Exception e) {
LOG.error("Error suspending bookie", e);
}
}
};
sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
sleeper.start();
}
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
int startTxid, int endTxid) throws IOException, KeeperException,
InterruptedException {