HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh)
(cherry picked from commit f0412de1c1
)
This commit is contained in:
parent
35fecb5306
commit
2cbac36fd3
|
@ -346,6 +346,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7797. Add audit log for setQuota operation (Rakesh R via umamahesh)
|
HDFS-7797. Add audit log for setQuota operation (Rakesh R via umamahesh)
|
||||||
|
|
||||||
|
HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
|
|
@ -152,6 +152,13 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
= "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
|
= "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
|
||||||
public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
|
public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
|
||||||
|
|
||||||
|
public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE
|
||||||
|
= "dfs.namenode.bookkeeperjournal.ack.quorum-size";
|
||||||
|
|
||||||
|
public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC
|
||||||
|
= "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec";
|
||||||
|
public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5;
|
||||||
|
|
||||||
private ZooKeeper zkc;
|
private ZooKeeper zkc;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final BookKeeper bkc;
|
private final BookKeeper bkc;
|
||||||
|
@ -162,6 +169,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
private final MaxTxId maxTxId;
|
private final MaxTxId maxTxId;
|
||||||
private final int ensembleSize;
|
private final int ensembleSize;
|
||||||
private final int quorumSize;
|
private final int quorumSize;
|
||||||
|
private final int ackQuorumSize;
|
||||||
|
private final int addEntryTimeout;
|
||||||
private final String digestpw;
|
private final String digestpw;
|
||||||
private final int speculativeReadTimeout;
|
private final int speculativeReadTimeout;
|
||||||
private final int readEntryTimeout;
|
private final int readEntryTimeout;
|
||||||
|
@ -184,6 +193,9 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
||||||
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
||||||
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
||||||
|
ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
|
||||||
|
addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
|
||||||
|
BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT);
|
||||||
speculativeReadTimeout = conf.getInt(
|
speculativeReadTimeout = conf.getInt(
|
||||||
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
|
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
|
||||||
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
|
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
|
||||||
|
@ -216,6 +228,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
ClientConfiguration clientConf = new ClientConfiguration();
|
ClientConfiguration clientConf = new ClientConfiguration();
|
||||||
clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
|
clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
|
||||||
clientConf.setReadEntryTimeout(readEntryTimeout);
|
clientConf.setReadEntryTimeout(readEntryTimeout);
|
||||||
|
clientConf.setAddEntryTimeout(addEntryTimeout);
|
||||||
bkc = new BookKeeper(clientConf, zkc);
|
bkc = new BookKeeper(clientConf, zkc);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new IOException("Error initializing zk", e);
|
throw new IOException("Error initializing zk", e);
|
||||||
|
@ -403,7 +416,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
// bookkeeper errored on last stream, clean up ledger
|
// bookkeeper errored on last stream, clean up ledger
|
||||||
currentLedger.close();
|
currentLedger.close();
|
||||||
}
|
}
|
||||||
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
|
currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize,
|
||||||
BookKeeper.DigestType.MAC,
|
BookKeeper.DigestType.MAC,
|
||||||
digestpw.getBytes(Charsets.UTF_8));
|
digestpw.getBytes(Charsets.UTF_8));
|
||||||
} catch (BKException bke) {
|
} catch (BKException bke) {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
@ -67,6 +68,7 @@ public class TestBookKeeperJournalManager {
|
||||||
private ZooKeeper zkc;
|
private ZooKeeper zkc;
|
||||||
private static BKJMUtil bkutil;
|
private static BKJMUtil bkutil;
|
||||||
static int numBookies = 3;
|
static int numBookies = 3;
|
||||||
|
private BookieServer newBookie;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupBookkeeper() throws Exception {
|
public static void setupBookkeeper() throws Exception {
|
||||||
|
@ -87,6 +89,9 @@ public class TestBookKeeperJournalManager {
|
||||||
@After
|
@After
|
||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
zkc.close();
|
zkc.close();
|
||||||
|
if (newBookie != null) {
|
||||||
|
newBookie.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private NamespaceInfo newNSInfo() {
|
private NamespaceInfo newNSInfo() {
|
||||||
|
@ -377,7 +382,8 @@ public class TestBookKeeperJournalManager {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testAllBookieFailure() throws Exception {
|
public void testAllBookieFailure() throws Exception {
|
||||||
BookieServer bookieToFail = bkutil.newBookie();
|
// bookie to fail
|
||||||
|
newBookie = bkutil.newBookie();
|
||||||
BookieServer replacementBookie = null;
|
BookieServer replacementBookie = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -408,7 +414,7 @@ public class TestBookKeeperJournalManager {
|
||||||
}
|
}
|
||||||
out.setReadyToFlush();
|
out.setReadyToFlush();
|
||||||
out.flush();
|
out.flush();
|
||||||
bookieToFail.shutdown();
|
newBookie.shutdown();
|
||||||
assertEquals("New bookie didn't die",
|
assertEquals("New bookie didn't die",
|
||||||
numBookies, bkutil.checkBookiesUp(numBookies, 10));
|
numBookies, bkutil.checkBookiesUp(numBookies, 10));
|
||||||
|
|
||||||
|
@ -449,7 +455,7 @@ public class TestBookKeeperJournalManager {
|
||||||
if (replacementBookie != null) {
|
if (replacementBookie != null) {
|
||||||
replacementBookie.shutdown();
|
replacementBookie.shutdown();
|
||||||
}
|
}
|
||||||
bookieToFail.shutdown();
|
newBookie.shutdown();
|
||||||
|
|
||||||
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
||||||
LOG.warn("Not all bookies from this test shut down, expect errors");
|
LOG.warn("Not all bookies from this test shut down, expect errors");
|
||||||
|
@ -464,7 +470,7 @@ public class TestBookKeeperJournalManager {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testOneBookieFailure() throws Exception {
|
public void testOneBookieFailure() throws Exception {
|
||||||
BookieServer bookieToFail = bkutil.newBookie();
|
newBookie = bkutil.newBookie();
|
||||||
BookieServer replacementBookie = null;
|
BookieServer replacementBookie = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -500,7 +506,7 @@ public class TestBookKeeperJournalManager {
|
||||||
replacementBookie = bkutil.newBookie();
|
replacementBookie = bkutil.newBookie();
|
||||||
assertEquals("replacement bookie didn't start",
|
assertEquals("replacement bookie didn't start",
|
||||||
ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
|
ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
|
||||||
bookieToFail.shutdown();
|
newBookie.shutdown();
|
||||||
assertEquals("New bookie didn't die",
|
assertEquals("New bookie didn't die",
|
||||||
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
||||||
|
|
||||||
|
@ -518,7 +524,7 @@ public class TestBookKeeperJournalManager {
|
||||||
if (replacementBookie != null) {
|
if (replacementBookie != null) {
|
||||||
replacementBookie.shutdown();
|
replacementBookie.shutdown();
|
||||||
}
|
}
|
||||||
bookieToFail.shutdown();
|
newBookie.shutdown();
|
||||||
|
|
||||||
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
||||||
LOG.warn("Not all bookies from this test shut down, expect errors");
|
LOG.warn("Not all bookies from this test shut down, expect errors");
|
||||||
|
@ -822,6 +828,141 @@ public class TestBookKeeperJournalManager {
|
||||||
assertTrue("No thread managed to complete formatting", numCompleted > 0);
|
assertTrue("No thread managed to complete formatting", numCompleted > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testDefaultAckQuorum() throws Exception {
|
||||||
|
newBookie = bkutil.newBookie();
|
||||||
|
int ensembleSize = numBookies + 1;
|
||||||
|
int quorumSize = numBookies + 1;
|
||||||
|
// ensure that the journal manager has to use all bookies,
|
||||||
|
// so that a failure will fail the journal manager
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
||||||
|
ensembleSize);
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
|
||||||
|
quorumSize);
|
||||||
|
// sets 2 secs
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
|
||||||
|
2);
|
||||||
|
NamespaceInfo nsi = newNSInfo();
|
||||||
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
|
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
CountDownLatch sleepLatch = new CountDownLatch(1);
|
||||||
|
sleepBookie(sleepLatch, newBookie);
|
||||||
|
|
||||||
|
EditLogOutputStream out = bkjm.startLogSegment(1,
|
||||||
|
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
int numTransactions = 100;
|
||||||
|
for (long i = 1; i <= numTransactions; i++) {
|
||||||
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||||
|
op.setTransactionId(i);
|
||||||
|
out.write(op);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
out.close();
|
||||||
|
bkjm.finalizeLogSegment(1, numTransactions);
|
||||||
|
|
||||||
|
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
|
||||||
|
bkjm.selectInputStreams(in, 1, true);
|
||||||
|
try {
|
||||||
|
assertEquals(numTransactions,
|
||||||
|
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
|
||||||
|
} finally {
|
||||||
|
in.get(0).close();
|
||||||
|
}
|
||||||
|
fail("Should throw exception as not enough non-faulty bookies available!");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie
|
||||||
|
* alive and sleep all the other bookies. Now the client would wait for the
|
||||||
|
* acknowledgement from the ack size bookies and after receiving the success
|
||||||
|
* response will continue writing. Non ack client will hang long time to add
|
||||||
|
* entries.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testAckQuorum() throws Exception {
|
||||||
|
// slow bookie
|
||||||
|
newBookie = bkutil.newBookie();
|
||||||
|
// make quorum size and ensemble size same to avoid the interleave writing
|
||||||
|
// of the ledger entries
|
||||||
|
int ensembleSize = numBookies + 1;
|
||||||
|
int quorumSize = numBookies + 1;
|
||||||
|
int ackSize = numBookies;
|
||||||
|
// ensure that the journal manager has to use all bookies,
|
||||||
|
// so that a failure will fail the journal manager
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
||||||
|
ensembleSize);
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
|
||||||
|
quorumSize);
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE,
|
||||||
|
ackSize);
|
||||||
|
// sets 60 minutes
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
|
||||||
|
3600);
|
||||||
|
|
||||||
|
NamespaceInfo nsi = newNSInfo();
|
||||||
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
|
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
CountDownLatch sleepLatch = new CountDownLatch(1);
|
||||||
|
sleepBookie(sleepLatch, newBookie);
|
||||||
|
|
||||||
|
EditLogOutputStream out = bkjm.startLogSegment(1,
|
||||||
|
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
int numTransactions = 100;
|
||||||
|
for (long i = 1; i <= numTransactions; i++) {
|
||||||
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||||
|
op.setTransactionId(i);
|
||||||
|
out.write(op);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
bkjm.finalizeLogSegment(1, numTransactions);
|
||||||
|
|
||||||
|
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
|
||||||
|
bkjm.selectInputStreams(in, 1, true);
|
||||||
|
try {
|
||||||
|
assertEquals(numTransactions,
|
||||||
|
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
|
||||||
|
} finally {
|
||||||
|
sleepLatch.countDown();
|
||||||
|
in.get(0).close();
|
||||||
|
bkjm.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sleep a bookie until I count down the latch
|
||||||
|
*
|
||||||
|
* @param latch
|
||||||
|
* Latch to wait on
|
||||||
|
* @param bookie
|
||||||
|
* bookie server
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private void sleepBookie(final CountDownLatch l, final BookieServer bookie)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Thread sleeper = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
bookie.suspendProcessing();
|
||||||
|
l.await(60, TimeUnit.SECONDS);
|
||||||
|
bookie.resumeProcessing();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error suspending bookie", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
|
||||||
|
sleeper.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
|
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
|
||||||
int startTxid, int endTxid) throws IOException, KeeperException,
|
int startTxid, int endTxid) throws IOException, KeeperException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue