Fix TransportNodesListShardStoreMetaData for custom data paths
Cleans up the testReusePeerRecovery test as well The actual fix is in TransportNodesListShardStoreMetaData.java, which needs to use `nodeEnv.shardDataPaths` instead of `nodeEnv.shardPaths`. Due to the difficulty in tracking this down, I've added a lot of additional logging. This also fixes a logging issue in GatewayAllocator
This commit is contained in:
parent
904f20a41b
commit
31652a8b3d
|
@ -330,6 +330,8 @@ public class GatewayAllocator extends AbstractComponent {
|
|||
sizeMatched += storeFileMetaData.length();
|
||||
}
|
||||
}
|
||||
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
|
||||
shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched);
|
||||
if (sizeMatched > lastSizeMatched) {
|
||||
lastSizeMatched = sizeMatched;
|
||||
lastDiscoNodeMatched = discoNode;
|
||||
|
@ -345,7 +347,7 @@ public class GatewayAllocator extends AbstractComponent {
|
|||
// we only check on THROTTLE since we checked before before on NO
|
||||
Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation);
|
||||
if (decision.type() == Decision.Type.THROTTLE) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
||||
}
|
||||
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
||||
|
@ -411,6 +413,8 @@ public class GatewayAllocator extends AbstractComponent {
|
|||
|
||||
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) {
|
||||
// -1 version means it does not exists, which is what the API returns, and what we expect to
|
||||
logger.trace("[{}] on node [{}] has version [{}] of shard",
|
||||
shard, nodeShardState.getNode(), nodeShardState.version());
|
||||
shardStates.put(nodeShardState.getNode(), nodeShardState.version());
|
||||
}
|
||||
return shardStates;
|
||||
|
|
|
@ -244,7 +244,9 @@ public abstract class MetaDataStateFormat<T> {
|
|||
maxVersion = Math.max(maxVersion, version);
|
||||
final boolean legacy = MetaDataStateFormat.STATE_FILE_EXTENSION.equals(matcher.group(2)) == false;
|
||||
maxVersionIsLegacy &= legacy; // on purpose, see NOTE below
|
||||
files.add(new PathAndVersion(stateFile, version, legacy));
|
||||
PathAndVersion pav = new PathAndVersion(stateFile, version, legacy);
|
||||
logger.trace("found state file: {}", pav);
|
||||
files.add(pav);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -275,6 +277,7 @@ public abstract class MetaDataStateFormat<T> {
|
|||
}
|
||||
} else {
|
||||
state = format.read(stateFile, version);
|
||||
logger.trace("state version [{}] read from [{}]", version, stateFile.getFileName());
|
||||
}
|
||||
return state;
|
||||
} catch (Throwable e) {
|
||||
|
@ -324,6 +327,10 @@ public abstract class MetaDataStateFormat<T> {
|
|||
this.version = version;
|
||||
this.legacy = legacy;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "[version:" + version + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -165,7 +165,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
|
|||
if (!storeType.contains("fs")) {
|
||||
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
||||
}
|
||||
Path[] shardLocations = nodeEnv.shardPaths(shardId);
|
||||
Path[] shardLocations = nodeEnv.shardDataPaths(shardId, metaData.settings());
|
||||
Path[] shardIndexLocations = new Path[shardLocations.length];
|
||||
for (int i = 0; i < shardLocations.length; i++) {
|
||||
shardIndexLocations[i] = shardLocations[i].resolve("index");
|
||||
|
|
|
@ -354,7 +354,8 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
|
|||
// prevent any rebalance actions during the peer recovery
|
||||
// if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
|
||||
// we reuse the files on disk after full restarts for replicas.
|
||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)));
|
||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings())));
|
||||
ensureGreen();
|
||||
logger.info("--> indexing docs");
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
|
||||
|
@ -368,44 +369,48 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
|
|||
logger.info("Running Cluster Health");
|
||||
ensureGreen();
|
||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).setMaxNumSegments(100).get(); // just wait for merges
|
||||
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).execute().actionGet();
|
||||
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
|
||||
|
||||
logger.info("--> shutting down the nodes");
|
||||
logger.info("--> disabling allocation while the cluster is shut down");
|
||||
|
||||
// Disable allocations while we are closing nodes
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(settingsBuilder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
|
||||
.get();
|
||||
logger.info("--> full cluster restart");
|
||||
internalCluster().fullRestart();
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
logger.info("--> waiting for cluster to return to green after first shutdown");
|
||||
ensureGreen();
|
||||
logger.info("--> shutting down the nodes");
|
||||
|
||||
logger.info("--> disabling allocation while the cluster is shut down second time");
|
||||
// Disable allocations while we are closing nodes
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(settingsBuilder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
|
||||
.get();
|
||||
logger.info("--> full cluster restart");
|
||||
internalCluster().fullRestart();
|
||||
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
logger.info("--> waiting for cluster to return to green after second shutdown");
|
||||
ensureGreen();
|
||||
|
||||
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
|
||||
|
||||
for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
|
||||
RecoveryState recoveryState = response.recoveryState();
|
||||
if (!recoveryState.getPrimary()) {
|
||||
logger.info("--> shard {}, recovered {}, reuse {}", response.getShardId(), recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount());
|
||||
assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(0l));
|
||||
assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(0l));
|
||||
assertThat(recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount()));
|
||||
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
|
||||
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
|
||||
assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0));
|
||||
assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes()));
|
||||
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
|
||||
response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
|
||||
recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount());
|
||||
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredByteCount(), equalTo(0l));
|
||||
assertThat("data should have been reused", recoveryState.getIndex().reusedByteCount(), greaterThan(0l));
|
||||
assertThat("all bytes should be reused", recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount()));
|
||||
assertThat("no files should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(0));
|
||||
assertThat("all files should be reused", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
|
||||
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
|
||||
assertThat("all bytes should be reused bytes",
|
||||
recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes()));
|
||||
} else {
|
||||
assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(recoveryState.getIndex().reusedByteCount()));
|
||||
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(recoveryState.getIndex().reusedFileCount()));
|
||||
|
|
Loading…
Reference in New Issue