SOLR-4531: Add tests to ensure that recovery does not fail on corrupted tlogs

This commit is contained in:
Shalin Shekhar Mangar 2016-10-25 12:13:08 +05:30
parent ce57e8a8f4
commit b7aa582dff
3 changed files with 158 additions and 78 deletions

View File

@ -341,6 +341,9 @@ Other Changes
Last JVM garbage collection log solr_gc.log is moved into $SOLR_LOGS_DIR/archived/ Last JVM garbage collection log solr_gc.log is moved into $SOLR_LOGS_DIR/archived/
(janhoy) (janhoy)
* SOLR-4531: Add tests to ensure that recovery does not fail on corrupted tlogs.
(Simon Scofield, Cao Manh Dat via shalin)
================== 6.2.1 ================== ================== 6.2.1 ==================
Bug Fixes Bug Fixes

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.UpdateLog;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestCloudRecovery extends SolrCloudTestCase {
private static final String COLLECTION = "collection1";
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
configureCluster(2)
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
CollectionAdminRequest
.createCollection(COLLECTION, "config", 2, 2)
.setMaxShardsPerNode(2)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
}
@Before
public void resetCollection() throws IOException, SolrServerException {
cluster.getSolrClient().deleteByQuery(COLLECTION, "*:*");
cluster.getSolrClient().commit(COLLECTION);
}
@Test
public void leaderRecoverFromLogOnStartupTest() throws Exception {
AtomicInteger countReplayLog = new AtomicInteger(0);
DirectUpdateHandler2.commitOnClose = false;
UpdateLog.testing_logReplayFinishHook = countReplayLog::incrementAndGet;
CloudSolrClient cloudClient = cluster.getSolrClient();
cloudClient.add(COLLECTION, sdoc("id", "1"));
cloudClient.add(COLLECTION, sdoc("id", "2"));
cloudClient.add(COLLECTION, sdoc("id", "3"));
cloudClient.add(COLLECTION, sdoc("id", "4"));
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("q", "*:*");
QueryResponse resp = cloudClient.query(COLLECTION, params);
assertEquals(0, resp.getResults().getNumFound());
ChaosMonkey.stop(cluster.getJettySolrRunners());
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
ChaosMonkey.start(cluster.getJettySolrRunners());
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), COLLECTION, 120000));
resp = cloudClient.query(COLLECTION, params);
assertEquals(4, resp.getResults().getNumFound());
// Make sure all nodes is recover from tlog
assertEquals(4, countReplayLog.get());
}
@Test
public void corruptedLogTest() throws Exception {
AtomicInteger countReplayLog = new AtomicInteger(0);
DirectUpdateHandler2.commitOnClose = false;
UpdateLog.testing_logReplayFinishHook = countReplayLog::incrementAndGet;
CloudSolrClient cloudClient = cluster.getSolrClient();
cloudClient.add(COLLECTION, sdoc("id", "1000"));
cloudClient.add(COLLECTION, sdoc("id", "1001"));
for (int i = 0; i < 10; i++) {
cloudClient.add(COLLECTION, sdoc("id", String.valueOf(i)));
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("q", "*:*");
QueryResponse resp = cloudClient.query(COLLECTION, params);
assertEquals(0, resp.getResults().getNumFound());
int logHeaderSize = Integer.MAX_VALUE;
Map<File, byte[]> contentFiles = new HashMap<>();
for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
File tlogFolder = new File(solrCore.getUlogDir(), UpdateLog.TLOG_NAME);
String[] tLogFiles = tlogFolder.list();
Arrays.sort(tLogFiles);
File lastTLogFile = new File(tlogFolder.getAbsolutePath() + "/" + tLogFiles[tLogFiles.length - 1]);
byte[] tlogBytes = IOUtils.toByteArray(new FileInputStream(lastTLogFile));
contentFiles.put(lastTLogFile, tlogBytes);
logHeaderSize = Math.min(tlogBytes.length, logHeaderSize);
}
}
ChaosMonkey.stop(cluster.getJettySolrRunners());
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
for (Map.Entry<File, byte[]> entry : contentFiles.entrySet()) {
byte[] tlogBytes = entry.getValue();
if (tlogBytes.length <= logHeaderSize) continue;
FileOutputStream stream = new FileOutputStream(entry.getKey());
int skipLastBytes = Math.max(random().nextInt(tlogBytes.length - logHeaderSize), 2);
for (int i = 0; i < entry.getValue().length - skipLastBytes; i++) {
stream.write(tlogBytes[i]);
}
stream.close();
}
ChaosMonkey.start(cluster.getJettySolrRunners());
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), COLLECTION, 120000));
resp = cloudClient.query(COLLECTION, params);
// Make sure cluster still healthy
assertTrue(resp.getResults().getNumFound() >= 2);
}
}

View File

@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.UpdateLog;
import org.junit.Test;
public class TestLeaderRecoverFromLogOnStartup extends AbstractFullDistribZkTestBase {
@Override
public void distribSetUp() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
super.distribSetUp();
}
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
AtomicInteger countReplayLog = new AtomicInteger(0);
DirectUpdateHandler2.commitOnClose = false;
UpdateLog.testing_logReplayFinishHook = new Runnable() {
@Override
public void run() {
countReplayLog.incrementAndGet();
}
};
String testCollectionName = "testCollection";
createCollection(testCollectionName, 2, 2, 1);
waitForRecoveriesToFinish(false);
cloudClient.setDefaultCollection(testCollectionName);
cloudClient.add(sdoc("id", "1"));
cloudClient.add(sdoc("id", "2"));
cloudClient.add(sdoc("id", "3"));
cloudClient.add(sdoc("id", "4"));
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("q", "*:*");
QueryResponse resp = cloudClient.query(params);
assertEquals(0, resp.getResults().getNumFound());
ChaosMonkey.stop(jettys);
ChaosMonkey.stop(controlJetty);
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
ChaosMonkey.start(jettys);
ChaosMonkey.start(controlJetty);
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), testCollectionName, 120000));
cloudClient.commit();
resp = cloudClient.query(params);
assertEquals(4, resp.getResults().getNumFound());
// Make sure all nodes is recover from tlog
assertEquals(4, countReplayLog.get());
}
}