recovery state: fix concurrent access to file list

A shard recovery response might serialize a shard state at the same time that it is
modified by the recovery process. The test
RelocationTests.testMoveShardsWhileRelocation
failed because of this with a ConcurrentModificationException.

closes #10381
This commit is contained in:
Britta Weber 2015-04-01 21:50:18 +02:00
parent 2beda3953d
commit 82071659de
2 changed files with 28 additions and 2 deletions

View File

@ -858,7 +858,7 @@ public class RecoveryState implements ToXContent, Streamable {
}
@Override
public void readFrom(StreamInput in) throws IOException {
public synchronized void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
@ -870,7 +870,7 @@ public class RecoveryState implements ToXContent, Streamable {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
public synchronized void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
final File[] files = fileDetails.values().toArray(new File[0]);
out.writeVInt(files.length);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState.*;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
@ -481,4 +482,29 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
assertThat(lastRead.time(), lessThanOrEqualTo(start.time()));
}
}
@Test
public void testConcurrentModificationIndexFileDetailsMap() throws InterruptedException {
final Index index = new Index();
final AtomicBoolean stop = new AtomicBoolean(false);
Streamer<Index> readWriteIndex = new Streamer<Index>(stop, index) {
@Override
Index createObj() {
return new Index();
}
};
Thread modifyThread = new Thread() {
public void run() {
for (int i = 0; i < 1000; i++) {
index.addFileDetail(randomAsciiOfLength(10), 100, true);
}
stop.set(true);
}
};
readWriteIndex.start();
modifyThread.start();
modifyThread.join();
readWriteIndex.join();
assertThat(readWriteIndex.error.get(), equalTo(null));
}
}