HDFS-4265. BKJM doesn't take advantage of speculative reads. Contributed by Rakesh R.
(cherry picked from commit 0d521e3326
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
parent
7b9c978df2
commit
3ad1d8d533
|
@ -665,6 +665,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7776. Adding additional unit tests for Quota By Storage Type.
|
HDFS-7776. Adding additional unit tests for Quota By Storage Type.
|
||||||
(Xiaoyu Yao via Arpit Agarwal)
|
(Xiaoyu Yao via Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-4625. BKJM doesn't take advantage of speculative reads. (Rakesh R
|
||||||
|
via aajisaka)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto;
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
import static com.google.common.base.Charsets.UTF_8;
|
import static com.google.common.base.Charsets.UTF_8;
|
||||||
|
|
||||||
|
import org.apache.commons.io.Charsets;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -142,6 +143,15 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
|
public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
|
||||||
= "/ledgers/available";
|
= "/ledgers/available";
|
||||||
|
|
||||||
|
public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS
|
||||||
|
= "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
|
||||||
|
public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT
|
||||||
|
= 2000;
|
||||||
|
|
||||||
|
public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC
|
||||||
|
= "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
|
||||||
|
public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
|
||||||
|
|
||||||
private ZooKeeper zkc;
|
private ZooKeeper zkc;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final BookKeeper bkc;
|
private final BookKeeper bkc;
|
||||||
|
@ -153,6 +163,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
private final int ensembleSize;
|
private final int ensembleSize;
|
||||||
private final int quorumSize;
|
private final int quorumSize;
|
||||||
private final String digestpw;
|
private final String digestpw;
|
||||||
|
private final int speculativeReadTimeout;
|
||||||
|
private final int readEntryTimeout;
|
||||||
private final CountDownLatch zkConnectLatch;
|
private final CountDownLatch zkConnectLatch;
|
||||||
private final NamespaceInfo nsInfo;
|
private final NamespaceInfo nsInfo;
|
||||||
private boolean initialized = false;
|
private boolean initialized = false;
|
||||||
|
@ -172,6 +184,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
||||||
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
||||||
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
||||||
|
speculativeReadTimeout = conf.getInt(
|
||||||
|
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
|
||||||
|
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
|
||||||
|
readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC,
|
||||||
|
BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT);
|
||||||
|
|
||||||
ledgerPath = basePath + "/ledgers";
|
ledgerPath = basePath + "/ledgers";
|
||||||
String maxTxIdPath = basePath + "/maxtxid";
|
String maxTxIdPath = basePath + "/maxtxid";
|
||||||
|
@ -196,7 +213,10 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
prepareBookKeeperEnv();
|
prepareBookKeeperEnv();
|
||||||
bkc = new BookKeeper(new ClientConfiguration(), zkc);
|
ClientConfiguration clientConf = new ClientConfiguration();
|
||||||
|
clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
|
||||||
|
clientConf.setReadEntryTimeout(readEntryTimeout);
|
||||||
|
bkc = new BookKeeper(clientConf, zkc);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new IOException("Error initializing zk", e);
|
throw new IOException("Error initializing zk", e);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
|
@ -385,7 +405,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
}
|
}
|
||||||
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
|
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
|
||||||
BookKeeper.DigestType.MAC,
|
BookKeeper.DigestType.MAC,
|
||||||
digestpw.getBytes());
|
digestpw.getBytes(Charsets.UTF_8));
|
||||||
} catch (BKException bke) {
|
} catch (BKException bke) {
|
||||||
throw new IOException("Error creating ledger", bke);
|
throw new IOException("Error creating ledger", bke);
|
||||||
} catch (KeeperException ke) {
|
} catch (KeeperException ke) {
|
||||||
|
@ -522,10 +542,10 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
LedgerHandle h;
|
LedgerHandle h;
|
||||||
if (l.isInProgress()) { // we don't want to fence the current journal
|
if (l.isInProgress()) { // we don't want to fence the current journal
|
||||||
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
|
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
|
||||||
BookKeeper.DigestType.MAC, digestpw.getBytes());
|
BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8));
|
||||||
} else {
|
} else {
|
||||||
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
|
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
|
||||||
digestpw.getBytes());
|
digestpw.getBytes(Charsets.UTF_8));
|
||||||
}
|
}
|
||||||
elis = new BookKeeperEditLogInputStream(h, l);
|
elis = new BookKeeperEditLogInputStream(h, l);
|
||||||
elis.skipTo(fromTxId);
|
elis.skipTo(fromTxId);
|
||||||
|
@ -732,11 +752,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
if (fence) {
|
if (fence) {
|
||||||
lh = bkc.openLedger(l.getLedgerId(),
|
lh = bkc.openLedger(l.getLedgerId(),
|
||||||
BookKeeper.DigestType.MAC,
|
BookKeeper.DigestType.MAC,
|
||||||
digestpw.getBytes());
|
digestpw.getBytes(Charsets.UTF_8));
|
||||||
} else {
|
} else {
|
||||||
lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
|
lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
|
||||||
BookKeeper.DigestType.MAC,
|
BookKeeper.DigestType.MAC,
|
||||||
digestpw.getBytes());
|
digestpw.getBytes(Charsets.UTF_8));
|
||||||
}
|
}
|
||||||
} catch (BKException bke) {
|
} catch (BKException bke) {
|
||||||
throw new IOException("Exception opening ledger for " + l, bke);
|
throw new IOException("Exception opening ledger for " + l, bke);
|
||||||
|
|
|
@ -0,0 +1,167 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.contrib.bkjournal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.bookkeeper.proto.BookieServer;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestBookKeeperSpeculativeRead {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestBookKeeperSpeculativeRead.class);
|
||||||
|
|
||||||
|
private ZooKeeper zkc;
|
||||||
|
private static BKJMUtil bkutil;
|
||||||
|
private static int numLocalBookies = 1;
|
||||||
|
private static List<BookieServer> bks = new ArrayList<BookieServer>();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBookkeeper() throws Exception {
|
||||||
|
bkutil = new BKJMUtil(1);
|
||||||
|
bkutil.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void teardownBookkeeper() throws Exception {
|
||||||
|
bkutil.teardown();
|
||||||
|
for (BookieServer bk : bks) {
|
||||||
|
bk.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
zkc = BKJMUtil.connectZooKeeper();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
zkc.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private NamespaceInfo newNSInfo() {
|
||||||
|
Random r = new Random();
|
||||||
|
return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test speculative read feature supported by bookkeeper. Keep one bookie
|
||||||
|
* alive and sleep all the other bookies. Non spec client will hang for long
|
||||||
|
* time to read the entries from the bookkeeper.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testSpeculativeRead() throws Exception {
|
||||||
|
// starting 9 more servers
|
||||||
|
for (int i = 1; i < 10; i++) {
|
||||||
|
bks.add(bkutil.newBookie());
|
||||||
|
}
|
||||||
|
NamespaceInfo nsi = newNSInfo();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
int ensembleSize = numLocalBookies + 9;
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
||||||
|
ensembleSize);
|
||||||
|
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
|
||||||
|
ensembleSize);
|
||||||
|
conf.setInt(
|
||||||
|
BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
|
||||||
|
100);
|
||||||
|
// sets 60 minute
|
||||||
|
conf.setInt(
|
||||||
|
BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600);
|
||||||
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
|
BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
|
final long numTransactions = 1000;
|
||||||
|
EditLogOutputStream out = bkjm.startLogSegment(1,
|
||||||
|
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
for (long i = 1; i <= numTransactions; i++) {
|
||||||
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||||
|
op.setTransactionId(i);
|
||||||
|
out.write(op);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
bkjm.finalizeLogSegment(1, numTransactions);
|
||||||
|
|
||||||
|
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
|
||||||
|
bkjm.selectInputStreams(in, 1, true);
|
||||||
|
|
||||||
|
// sleep 9 bk servers. Now only one server is running and responding to the
|
||||||
|
// clients
|
||||||
|
CountDownLatch sleepLatch = new CountDownLatch(1);
|
||||||
|
for (final BookieServer bookie : bks) {
|
||||||
|
sleepBookie(sleepLatch, bookie);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
assertEquals(numTransactions,
|
||||||
|
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
|
||||||
|
} finally {
|
||||||
|
in.get(0).close();
|
||||||
|
sleepLatch.countDown();
|
||||||
|
bkjm.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sleep a bookie until I count down the latch
|
||||||
|
*
|
||||||
|
* @param latch
|
||||||
|
* latch to wait on
|
||||||
|
* @param bookie
|
||||||
|
* bookie server
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private void sleepBookie(final CountDownLatch latch, final BookieServer bookie)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Thread sleeper = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
bookie.suspendProcessing();
|
||||||
|
latch.await(2, TimeUnit.MINUTES);
|
||||||
|
bookie.resumeProcessing();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error suspending bookie", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
|
||||||
|
sleeper.start();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue