This commit is contained in:
Uwe Schindler 2016-02-05 00:07:50 +01:00
commit 4569fd732a
5 changed files with 213 additions and 12 deletions

View File

@ -445,6 +445,8 @@ Bug Fixes
* SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
(Jan Høydahl, Steve Rowe)
* SOLR-8575: Fix HDFSLogReader replay status numbers and a performance bug where we can reopen
FSDataInputStream too often. (Mark Miller, Patrick Dvorack)
Optimizations
----------------------

View File

@ -390,16 +390,18 @@ public class HdfsTransactionLog extends TransactionLog {
// we actually need a new reader to
// see if any data was added by the writer
if (fis.position() >= sz) {
if (pos >= sz) {
log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz);
synchronized (HdfsTransactionLog.this) {
sz = fos.size();
}
fis.close();
tlogOutStream.hflush();
try {
FSDataInputStream fdis = fs.open(tlogFile);
fis = new FSDataFastInputStream(fdis, pos);
sz = fs.getFileStatus(tlogFile).getLen();
} catch (IOException e) {
throw new RuntimeException(e);
}
FSDataInputStream fdis = fs.open(tlogFile);
fis = new FSDataFastInputStream(fdis, pos);
}
if (pos == 0) {
@ -446,7 +448,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override
public long currentSize() {
return sz;
return fos.size();
}
}
@ -604,5 +606,3 @@ class FSDataFastInputStream extends FastInputStream {
return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
}
}

View File

@ -1324,7 +1324,7 @@ public class UpdateLog implements PluginInfoInitialized {
loglog.info(
"log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
Math.round(cpos / (double) csize * 100.));
Math.floor(cpos / (double) csize * 100.));
}
}

View File

@ -0,0 +1,136 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.common.SolrInputDocument;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@Slow
@Nightly
@SuppressSSL
public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTestBase {
private List<StoppableIndexingThread> threads;
public TlogReplayBufferedWhileIndexingTest() throws Exception {
super();
sliceCount = 1;
fixShardCount(2);
schemaString = "schema15.xml"; // we need a string id
}
@BeforeClass
public static void beforeRestartWhileUpdatingTest() throws Exception {
System.setProperty("leaderVoteWait", "300000");
System.setProperty("solr.autoCommit.maxTime", "10000");
System.setProperty("solr.autoSoftCommit.maxTime", "3000");
if (System.getProperty("solr.hdfs.home") != null) useFactory("solr.StandardDirectoryFactory");
}
@AfterClass
public static void afterRestartWhileUpdatingTest() {
System.clearProperty("leaderVoteWait");
System.clearProperty("solr.autoCommit.maxTime");
System.clearProperty("solr.autoSoftCommit.maxTime");
}
@Test
public void test() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForRecoveriesToFinish(false);
int numThreads = 1;
threads = new ArrayList<>(numThreads);
ArrayList<JettySolrRunner> allJetty = new ArrayList<>();
allJetty.addAll(jettys);
allJetty.remove(shardToLeaderJetty.get("shard1").jetty);
assert allJetty.size() == 1 : allJetty.size();
ChaosMonkey.stop(allJetty.get(0));
StoppableIndexingThread indexThread;
for (int i = 0; i < numThreads; i++) {
indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), false, 50000, 1, false);
threads.add(indexThread);
indexThread.start();
}
Thread.sleep(2000);
ChaosMonkey.start(allJetty.get(0));
Thread.sleep(45000);
waitForThingsToLevelOut(320);
Thread.sleep(2000);
waitForRecoveriesToFinish(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), false, true);
for (StoppableIndexingThread thread : threads) {
thread.safeStop();
thread.safeStop();
}
waitForThingsToLevelOut(30);
checkShardConsistency(false, false);
}
@Override
protected void indexDoc(SolrInputDocument doc) throws IOException,
SolrServerException {
cloudClient.add(doc);
}
@Override
public void distribTearDown() throws Exception {
// make sure threads have been stopped...
if (threads != null) {
for (StoppableIndexingThread thread : threads) {
thread.safeStop();
}
}
super.distribTearDown();
}
// skip the randoms - they can deadlock...
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
indexDoc(doc);
}
}

View File

@ -0,0 +1,63 @@
package org.apache.solr.cloud.hdfs;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.cloud.TlogReplayBufferedWhileIndexingTest;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
@Slow
@Nightly
@ThreadLeakFilters(defaultFilters = true, filters = {
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
})
public class HdfsTlogReplayBufferedWhileIndexingTest extends TlogReplayBufferedWhileIndexingTest {
public HdfsTlogReplayBufferedWhileIndexingTest() throws Exception {
super();
}
private static MiniDFSCluster dfsCluster;
@BeforeClass
public static void setupClass() throws Exception {
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
}
@AfterClass
public static void teardownClass() throws Exception {
HdfsTestUtil.teardownClass(dfsCluster);
dfsCluster = null;
}
@Override
protected String getDataDir(String dataDir) throws IOException {
return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
}
}