mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
more recovery under load tests, now with nodes shutdown, ignore node connected exception or retry when performing replicated operations
This commit is contained in:
parent
9def68a733
commit
e48b1d98db
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.PrimaryNotStartedActionException;
|
||||
@ -293,7 +294,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||
}
|
||||
|
||||
@Override public void handleException(RemoteTransportException exp) {
|
||||
listener.onFailure(exp);
|
||||
// if we got disconnected from the node, retry it...
|
||||
if (exp.unwrapCause() instanceof ConnectTransportException) {
|
||||
primaryOperationStarted.set(false);
|
||||
retryPrimary(fromClusterEvent, shard.shardId());
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
@ -357,10 +364,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||
try {
|
||||
Response response = shardOperationOnPrimary(new ShardOperationRequest(primaryShardId, request));
|
||||
performReplicas(response, alreadyThreaded);
|
||||
} catch (IndexShardNotStartedException e) {
|
||||
// still in recovery, retry (we know that its not UNASSIGNED OR INITIALIZING since we are checking it in the calling method)
|
||||
retryPrimary(fromDiscoveryListener, shard.shardId());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof IndexShardMissingException || e instanceof IndexShardNotStartedException
|
||||
|| e instanceof IndexMissingException) {
|
||||
retryPrimary(fromDiscoveryListener, shard.shardId());
|
||||
return;
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e);
|
||||
}
|
||||
@ -545,13 +554,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||
* </ul>
|
||||
*/
|
||||
private boolean ignoreReplicaException(Throwable e) {
|
||||
if (e instanceof IllegalIndexShardStateException) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof IllegalIndexShardStateException) {
|
||||
return true;
|
||||
}
|
||||
if (e instanceof IndexMissingException) {
|
||||
if (cause instanceof IndexMissingException) {
|
||||
return true;
|
||||
}
|
||||
if (e instanceof IndexShardMissingException) {
|
||||
if (cause instanceof IndexShardMissingException) {
|
||||
return true;
|
||||
}
|
||||
if (cause instanceof ConnectTransportException) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -39,6 +39,7 @@ public class NetworkExceptionHelper {
|
||||
return true;
|
||||
}
|
||||
if (e.getMessage() != null) {
|
||||
// UGLY!, this exception messages seems to represent closed connection
|
||||
if (e.getMessage().contains("Connection reset by peer")) {
|
||||
return true;
|
||||
}
|
||||
@ -48,6 +49,9 @@ public class NetworkExceptionHelper {
|
||||
if (e.getMessage().contains("forcibly closed")) {
|
||||
return true;
|
||||
}
|
||||
if (e.getMessage().contains("Broken pipe")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -43,9 +43,9 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||
}
|
||||
|
||||
@Test public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception {
|
||||
startNode("server1");
|
||||
startNode("node1");
|
||||
|
||||
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
|
||||
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
@ -56,7 +56,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||
@Override public void run() {
|
||||
while (!stop.get()) {
|
||||
long id = idGenerator.incrementAndGet();
|
||||
client("server1").prepareIndex("test", "type1", Long.toString(id))
|
||||
client("node1").prepareIndex("test", "type1", Long.toString(id))
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
}
|
||||
stopLatch.countDown();
|
||||
@ -65,46 +65,46 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||
writers[i].start();
|
||||
}
|
||||
|
||||
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) {
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) {
|
||||
Thread.sleep(100);
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
// now flush, just to make sure we have some data in the index, not just translog
|
||||
client("server1").admin().indices().prepareFlush().execute().actionGet();
|
||||
client("node1").admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
|
||||
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) {
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) {
|
||||
Thread.sleep(100);
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
// now start another node, while we index
|
||||
startNode("server2");
|
||||
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertThat(client("server1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
||||
|
||||
// wait till we index 10,0000
|
||||
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) {
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) {
|
||||
Thread.sleep(100);
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get()));
|
||||
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception {
|
||||
startNode("server1");
|
||||
startNode("node1");
|
||||
|
||||
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
|
||||
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
@ -115,7 +115,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||
@Override public void run() {
|
||||
while (!stop.get()) {
|
||||
long id = idGenerator.incrementAndGet();
|
||||
client("server1").prepareIndex("test", "type1", Long.toString(id))
|
||||
client("node1").prepareIndex("test", "type1", Long.toString(id))
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
}
|
||||
stopLatch.countDown();
|
||||
@ -124,39 +124,109 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||
writers[i].start();
|
||||
}
|
||||
|
||||
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) {
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) {
|
||||
Thread.sleep(100);
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
// now flush, just to make sure we have some data in the index, not just translog
|
||||
client("server1").admin().indices().prepareFlush().execute().actionGet();
|
||||
client("node1").admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
|
||||
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) {
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) {
|
||||
Thread.sleep(100);
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
// now start another node, while we index
|
||||
startNode("server2");
|
||||
startNode("server3");
|
||||
startNode("server4");
|
||||
startNode("node2");
|
||||
startNode("node3");
|
||||
startNode("node4");
|
||||
|
||||
assertThat(client("server1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
||||
|
||||
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 150000) {
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 150000) {
|
||||
Thread.sleep(100);
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
|
||||
client("server1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get()));
|
||||
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void recoverWhileUnderLoadWithNodeShutdown() throws Exception {
|
||||
startNode("node1");
|
||||
startNode("node2");
|
||||
|
||||
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
Thread[] writers = new Thread[5];
|
||||
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
writers[i] = new Thread() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
while (!stop.get()) {
|
||||
long id = idGenerator.incrementAndGet();
|
||||
client("node2").prepareIndex("test", "type1", Long.toString(id))
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
}
|
||||
} finally {
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
writers[i].start();
|
||||
}
|
||||
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) {
|
||||
Thread.sleep(100);
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
// now flush, just to make sure we have some data in the index, not just translog
|
||||
client("node1").admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) {
|
||||
Thread.sleep(100);
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
// now start nore nodes, while we index
|
||||
startNode("node3");
|
||||
startNode("node4");
|
||||
|
||||
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
||||
|
||||
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 80000) {
|
||||
Thread.sleep(100);
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
}
|
||||
|
||||
// now, shutdown nodes
|
||||
closeNode("node1");
|
||||
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
closeNode("node3");
|
||||
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
closeNode("node4");
|
||||
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
|
||||
client("node2").admin().indices().prepareRefresh().execute().actionGet();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertThat(client("node2").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user