SOLR-7437: Make HDFS transaction log replication factor configurable.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1676520 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2015-04-28 12:48:44 +00:00
parent b70ecaa3fd
commit 4809642e63
5 changed files with 41 additions and 9 deletions

View File

@ -151,6 +151,8 @@ New Features
Example: prices:{ type:range, field:price, mincount:1, start:0, end:100, gap:10 }
(yonik)
* SOLR-7437: Make HDFS transaction log replication factor configurable. (Mark Miller)
Bug Fixes
----------------------

View File

@ -68,11 +68,11 @@ public class HdfsTransactionLog extends TransactionLog {
private volatile boolean isClosed = false;
HdfsTransactionLog(FileSystem fs, Path tlogFile, Collection<String> globalStrings) {
this(fs, tlogFile, globalStrings, false);
HdfsTransactionLog(FileSystem fs, Path tlogFile, Collection<String> globalStrings, Integer tlogDfsReplication) {
this(fs, tlogFile, globalStrings, false, tlogDfsReplication);
}
HdfsTransactionLog(FileSystem fs, Path tlogFile, Collection<String> globalStrings, boolean openExisting) {
HdfsTransactionLog(FileSystem fs, Path tlogFile, Collection<String> globalStrings, boolean openExisting, Integer tlogDfsReplication) {
super();
boolean success = false;
this.fs = fs;
@ -95,7 +95,7 @@ public class HdfsTransactionLog extends TransactionLog {
} else {
fs.delete(tlogFile, false);
tlogOutStream = fs.create(tlogFile, (short)1);
tlogOutStream = fs.create(tlogFile, (short)tlogDfsReplication.intValue());
tlogOutStream.hsync();
}

View File

@ -48,6 +48,7 @@ public class HdfsUpdateLog extends UpdateLog {
private FileSystem fs;
private volatile Path tlogDir;
private final String confDir;
private Integer tlogDfsReplication;
// used internally by tests to track total count of failed tran log loads in init
public static AtomicLong INIT_FAILED_LOGS_COUNT = new AtomicLong();
@ -87,9 +88,12 @@ public class HdfsUpdateLog extends UpdateLog {
numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100);
maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10);
tlogDfsReplication = (Integer) info.initArgs.get( "tlogDfsReplication");
if (tlogDfsReplication == null) tlogDfsReplication = 1;
log.info("Initializing HdfsUpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={}",
dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep);
log.info("Initializing HdfsUpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} tlogDfsReplication={}",
dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep, tlogDfsReplication);
}
private Configuration getConf() {
@ -186,7 +190,7 @@ public class HdfsUpdateLog extends UpdateLog {
for (String oldLogName : tlogFiles) {
Path f = new Path(tlogDir, oldLogName);
try {
oldLog = new HdfsTransactionLog(fs, f, null, true);
oldLog = new HdfsTransactionLog(fs, f, null, true, tlogDfsReplication);
addOldLog(oldLog, false); // don't remove old logs on startup since more
// than one may be uncapped.
} catch (Exception e) {
@ -296,7 +300,7 @@ public class HdfsUpdateLog extends UpdateLog {
String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN,
TLOG_NAME, id);
HdfsTransactionLog ntlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName),
globalStrings);
globalStrings, tlogDfsReplication);
tlog = ntlog;
if (tlog != ntlog) {
@ -333,7 +337,7 @@ public class HdfsUpdateLog extends UpdateLog {
}
}
private String[] getLogList(Path tlogDir) throws FileNotFoundException, IOException {
public String[] getLogList(Path tlogDir) throws FileNotFoundException, IOException {
final String prefix = TLOG_NAME+'.';
FileStatus[] files = fs.listStatus(tlogDir, new PathFilter() {

View File

@ -51,6 +51,7 @@
<str name="dir">${solr.ulog.dir:}</str>
<str name="maxNumLogsToKeep">${solr.ulog.maxNumLogsToKeep:10}</str>
<str name="numRecordsToKeep">${solr.ulog.numRecordsToKeep:100}</str>
<int name="tlogDfsReplication">${solr.ulog.tlogDfsReplication:2}</int>
</updateLog>
</updateHandler>

View File

@ -37,7 +37,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.solr.SolrTestCaseJ4;
@ -119,7 +121,30 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
}
}
@Test
public void testReplicationFactor() throws Exception {
clearIndex();
HdfsUpdateLog ulog = (HdfsUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
assertU(commit());
addAndGetVersion(sdoc("id", "REP1"), null);
assertU(commit());
String[] logList = ulog.getLogList(new Path(ulog.getLogDir()));
boolean foundRep2 = false;
for (String tl : logList) {
FileStatus status = fs.getFileStatus(new Path(ulog.getLogDir(), tl));
if (status.getReplication() == 2) {
foundRep2 = true;
break;
}
}
assertTrue("Expected to find tlogs with a replication factor of 2", foundRep2);
}
@Test
public void testLogReplay() throws Exception {
try {