Merge branch 'master' into feature-suggest-refactoring

This commit is contained in:
Christoph Büscher 2016-02-23 16:47:13 -08:00
commit 78534847a3
36 changed files with 1468 additions and 2006 deletions

View File

@ -54,7 +54,7 @@ Once your changes and tests are ready to submit for review:
1. Test your changes
Run the test suite to make sure that nothing is broken. See the
[TESTING](TESTING.asciidoc) file for help running tests.
[TESTING](../TESTING.asciidoc) file for help running tests.
2. Sign the Contributor License Agreement

34
.github/ISSUE_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1,34 @@
<!--
GitHub is reserved for bug reports and feature requests. The best place
to ask a general question is at the Elastic Discourse forums at
https://discuss.elastic.co. If you are in fact posting a bug report or
a feature request, please include one and only one of the below blocks
in your new issue.
-->
<!--
If you are filing a bug report, please remove the below feature
request block and provide responses for all of the below items.
-->
**Elasticsearch version**:
**JVM version**:
**OS version**:
**Description of the problem including expected versus actual behavior**:
**Steps to reproduce**:
1.
2.
3.
**Provide logs (if relevant)**:
<!--
If you are filing a feature request, please remove the above bug
report block and provide responses for all of the below items.
-->
**Describe the feature**:

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest;
@ -111,7 +112,37 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
Translog.Location location = null;
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
BulkItemRequest item = request.items()[requestIndex];
location = handleItem(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
}
processAfterWrite(request.refresh(), indexShard, location);
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
responses[i] = items[i].getPrimaryResponse();
}
return new Tuple<>(new BulkShardResponse(request.shardId(), responses), request);
}
private Translog.Location handleItem(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
if (item.request() instanceof IndexRequest) {
location = index(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
} else if (item.request() instanceof DeleteRequest) {
location = delete(request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
} else if (item.request() instanceof UpdateRequest) {
Tuple<Translog.Location, BulkItemRequest> tuple = update(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
location = tuple.v1();
item = tuple.v2();
} else {
throw new IllegalStateException("Unexpected index operation: " + item.request());
}
assert item.getPrimaryResponse() != null;
assert preVersionTypes[requestIndex] != null;
return location;
}
private Translog.Location index(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
IndexRequest indexRequest = (IndexRequest) item.request();
preVersions[requestIndex] = indexRequest.version();
preVersionTypes[requestIndex] = indexRequest.versionType();
@ -130,11 +161,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
throw (ElasticsearchException) e;
}
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item (index) {}", e, request.shardId(), indexRequest);
} else {
logger.debug("{} failed to execute bulk item (index) {}", e, request.shardId(), indexRequest);
}
logFailure(e, "index", request.shardId(), indexRequest);
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
@ -145,7 +172,18 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
}
}
} else if (item.request() instanceof DeleteRequest) {
return location;
}
private <ReplicationRequestT extends ReplicationRequest<ReplicationRequestT>> void logFailure(Throwable e, String operation, ShardId shardId, ReplicationRequest<ReplicationRequestT> request) {
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
} else {
logger.debug("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
}
}
private Translog.Location delete(BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
preVersions[requestIndex] = deleteRequest.version();
preVersionTypes[requestIndex] = deleteRequest.versionType();
@ -165,11 +203,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
throw (ElasticsearchException) e;
}
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item (delete) {}", e, request.shardId(), deleteRequest);
} else {
logger.debug("{} failed to execute bulk item (delete) {}", e, request.shardId(), deleteRequest);
}
logFailure(e, "delete", request.shardId(), deleteRequest);
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
@ -180,7 +214,10 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
}
}
} else if (item.request() instanceof UpdateRequest) {
return location;
}
private Tuple<Translog.Location, BulkItemRequest> update(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
UpdateRequest updateRequest = (UpdateRequest) item.request();
preVersions[requestIndex] = updateRequest.version();
preVersionTypes[requestIndex] = updateRequest.versionType();
@ -258,21 +295,13 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
case UPSERT:
case INDEX:
IndexRequest indexRequest = updateResult.request();
if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item (index) {}", t, request.shardId(), indexRequest);
} else {
logger.debug("{} failed to execute bulk item (index) {}", t, request.shardId(), indexRequest);
}
logFailure(t, "index", request.shardId(), indexRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t)));
break;
case DELETE:
DeleteRequest deleteRequest = updateResult.request();
if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item (delete) {}", t, request.shardId(), deleteRequest);
} else {
logger.debug("{} failed to execute bulk item (delete) {}", t, request.shardId(), deleteRequest);
}
logFailure(t, "delete", request.shardId(), deleteRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t)));
break;
@ -284,21 +313,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
}
} else {
throw new IllegalStateException("Unexpected index operation: " + item.request());
}
assert item.getPrimaryResponse() != null;
assert preVersionTypes[requestIndex] != null;
}
processAfterWrite(request.refresh(), indexShard, location);
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
responses[i] = items[i].getPrimaryResponse();
}
return new Tuple<>(new BulkShardResponse(request.shardId(), responses), request);
return Tuple.tuple(location, item);
}
private void setResponse(BulkItemRequest request, BulkItemResponse response) {

View File

@ -215,6 +215,11 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
validationException = addValidationError("ttl must not be negative", validationException);
}
}
if (id != null && id.getBytes(StandardCharsets.UTF_8).length > 512) {
validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " +
id.getBytes(StandardCharsets.UTF_8).length, validationException);
}
return validationException;
}

View File

@ -83,6 +83,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
@ -461,43 +462,37 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.observedState();
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
if (blockException != null) {
handleBlockException(blockException);
return;
}
final String concreteIndex = resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request) : request.index();
blockException = state.blocks().indexBlockedException(indexBlockLevel(), concreteIndex);
if (blockException != null) {
handleBlockException(blockException);
if (handleBlockExceptions(state)) {
return;
}
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
resolveRequest(state.metaData(), concreteIndex, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
final ShardRouting primary = indexShard.primaryShard();
if (primary == null || primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
taskManager.registerChildTask(task, node.getId());
if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
performLocalAction(state, primary, node);
} else {
performRemoteAction(state, primary, node);
}
}
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
setPhase(task, "waiting_on_primary");
if (logger.isTraceEnabled()) {
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
}
performAction(node, transportPrimaryAction, true);
} else {
}
private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
if (state.version() < request.routedBasedOnClusterVersion()) {
logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
@ -513,6 +508,42 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
setPhase(task, "rerouted");
performAction(node, actionName, false);
}
private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
if (primary == null || primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return true;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
return true;
}
return false;
}
private String concreteIndex(ClusterState state) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request) : request.index();
}
private ShardRouting primary(ClusterState state) {
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
return indexShard.primaryShard();
}
private boolean handleBlockExceptions(ClusterState state) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
if (blockException != null) {
handleBlockException(blockException);
return true;
}
blockException = state.blocks().indexBlockedException(indexBlockLevel(), concreteIndex(state));
if (blockException != null) {
handleBlockException(blockException);
return true;
}
return false;
}
private void handleBlockException(ClusterBlockException blockException) {
@ -677,6 +708,14 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// closed in finishAsFailed(e) in the case of error
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
if (indexShardReference.isRelocated() == false) {
executeLocally();
} else {
executeRemotely();
}
}
private void executeLocally() throws Exception {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
if (logger.isTraceEnabled()) {
@ -684,7 +723,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
ReplicationPhase replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
finishAndMoveToReplication(replicationPhase);
} else {
}
private void executeRemotely() {
// delegate primary phase to relocation target
// it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
// phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
@ -696,7 +737,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
TransportChannelResponseHandler.responseHandler(logger, TransportReplicationAction.this::newResponseInstance, channel,
"rerouting indexing to target primary " + primary));
}
}
/**
* checks whether we can perform a write based on the write consistency setting
@ -835,23 +875,48 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
// If the index gets deleted after primary operation, we skip replication
final ClusterState state = clusterService.state();
final IndexRoutingTable index = state.getRoutingTable().index(shardId.getIndex());
final IndexShardRoutingTable shardRoutingTable = (index != null) ? index.shard(shardId.id()) : null;
final IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTableOrNull(shardId);
final IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex());
this.shards = (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.emptyList();
this.executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
this.nodes = state.getNodes();
List<ShardRouting> shards = shards(shardRoutingTable);
boolean executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
DiscoveryNodes nodes = state.getNodes();
if (shards.isEmpty()) {
logger.debug("replication phase for request [{}] on [{}] is skipped due to index deletion after primary operation", replicaRequest, shardId);
}
// we calculate number of target nodes to send replication operations, including nodes with relocating shards
AtomicInteger numberOfPendingShardInstances = new AtomicInteger();
this.totalShards = countTotalAndPending(shards, executeOnReplica, nodes, numberOfPendingShardInstances);
this.pending = numberOfPendingShardInstances;
this.shards = shards;
this.executeOnReplica = executeOnReplica;
this.nodes = nodes;
if (logger.isTraceEnabled()) {
logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(),
transportReplicaAction, replicaRequest, state.version());
}
}
private int countTotalAndPending(List<ShardRouting> shards, boolean executeOnReplica, DiscoveryNodes nodes, AtomicInteger pending) {
assert pending.get() == 0;
int numberOfIgnoredShardInstances = performOnShards(shards, executeOnReplica, nodes, shard -> pending.incrementAndGet(), shard -> pending.incrementAndGet());
// one for the local primary copy
return 1 + numberOfIgnoredShardInstances + pending.get();
}
private int performOnShards(List<ShardRouting> shards, boolean executeOnReplica, DiscoveryNodes nodes, Consumer<ShardRouting> onLocalShard, Consumer<ShardRouting> onRelocatingShard) {
int numberOfIgnoredShardInstances = 0;
int numberOfPendingShardInstances = 0;
for (ShardRouting shard : shards) {
// the following logic to select the shards to replicate to is mirrored and explained in the doRun method below
if (shard.primary() == false && executeOnReplica == false) {
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
// this delays mapping updates on replicas because they have
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
numberOfIgnoredShardInstances++;
continue;
}
@ -859,20 +924,26 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
numberOfIgnoredShardInstances++;
continue;
}
// we index on a replica that is initializing as well since we might not have got the event
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
// we never execute replication operation locally as primary operation has already completed locally
// hence, we ignore any local shard for replication
if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
numberOfPendingShardInstances++;
onLocalShard.accept(shard);
}
// send operation to relocating shard
// local shard can be a relocation target of a primary that is in relocated state
if (shard.relocating() && nodes.localNodeId().equals(shard.relocatingNodeId()) == false) {
numberOfPendingShardInstances++;
onRelocatingShard.accept(shard);
}
}
// one for the local primary copy
this.totalShards = 1 + numberOfPendingShardInstances + numberOfIgnoredShardInstances;
this.pending = new AtomicInteger(numberOfPendingShardInstances);
if (logger.isTraceEnabled()) {
logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(),
transportReplicaAction, replicaRequest, state.version());
return numberOfIgnoredShardInstances;
}
private List<ShardRouting> shards(IndexShardRoutingTable shardRoutingTable) {
return (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.emptyList();
}
/**
@ -912,36 +983,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
doFinish();
return;
}
for (ShardRouting shard : shards) {
if (shard.primary() == false && executeOnReplica == false) {
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
// this delays mapping updates on replicas because they have
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
continue;
}
if (shard.unassigned()) {
continue;
}
// we index on a replica that is initializing as well since we might not have got the event
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
// we never execute replication operation locally as primary operation has already completed locally
// hence, we ignore any local shard for replication
if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
performOnReplica(shard);
}
// send operation to relocating shard
// local shard can be a relocation target of a primary that is in relocated state
if (shard.relocating() && nodes.localNodeId().equals(shard.relocatingNodeId()) == false) {
performOnReplica(shard.buildTargetRelocatingShard());
}
}
performOnShards(shards, executeOnReplica, nodes, shard -> performOnReplica(shard), shard -> performOnReplica(shard.buildTargetRelocatingShard()));
}
/**

View File

@ -21,11 +21,13 @@ package org.elasticsearch.bootstrap;
import org.apache.lucene.util.Constants;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/** Checks that the JVM is ok and won't cause index corruption */
final class JVMCheck {
@ -42,10 +44,46 @@ final class JVMCheck {
*/
static final String JVM_BYPASS = "es.bypass.vm.check";
/**
* Metadata and messaging for checking and reporting HotSpot
* issues.
*/
interface HotSpotCheck {
/**
* If this HotSpot check should be executed.
*
* @return true if this HotSpot check should be executed
*/
boolean check();
/**
* The error message to display when this HotSpot issue is
* present.
*
* @return the error message for this HotSpot issue
*/
String getErrorMessage();
/**
* The warning message for this HotSpot issue if a workaround
* exists and is used.
*
* @return the warning message for this HotSpot issue
*/
Optional<String> getWarningMessage();
/**
* The workaround for this HotSpot issue, if one exists.
*
* @return the workaround for this HotSpot issue, if one exists
*/
Optional<String> getWorkaround();
}
/**
* Metadata and messaging for hotspot bugs.
*/
static final class HotspotBug {
static class HotspotBug implements HotSpotCheck {
/** OpenJDK bug URL */
final String bugUrl;
@ -59,7 +97,7 @@ final class JVMCheck {
}
/** Returns an error message to the user for a broken version */
String getErrorMessage() {
public String getErrorMessage() {
StringBuilder sb = new StringBuilder();
sb.append("Java version: ").append(fullVersion());
sb.append(" suffers from critical bug ").append(bugUrl);
@ -78,7 +116,7 @@ final class JVMCheck {
}
/** Warns the user when a workaround is being used to dodge the bug */
String getWarningMessage() {
public Optional<String> getWarningMessage() {
StringBuilder sb = new StringBuilder();
sb.append("Workaround flag ").append(workAround);
sb.append(" for bug ").append(bugUrl);
@ -88,15 +126,53 @@ final class JVMCheck {
sb.append(System.lineSeparator());
sb.append("Upgrading is preferred, see ").append(JVM_RECOMMENDATIONS);
sb.append(" for current recommendations.");
return Optional.of(sb.toString());
}
public boolean check() {
return true;
}
@Override
public Optional<String> getWorkaround() {
return Optional.of(workAround);
}
}
static class G1GCCheck implements HotSpotCheck {
@Override
public boolean check() {
return JvmInfo.jvmInfo().useG1GC().equals("true");
}
/** Returns an error message to the user for a broken version */
public String getErrorMessage() {
StringBuilder sb = new StringBuilder();
sb.append("Java version: ").append(fullVersion());
sb.append(" can cause data corruption");
sb.append(" when used with G1GC.");
sb.append(System.lineSeparator());
sb.append("Please upgrade the JVM, see ").append(JVM_RECOMMENDATIONS);
sb.append(" for current recommendations.");
return sb.toString();
}
@Override
public Optional<String> getWarningMessage() {
return Optional.empty();
}
@Override
public Optional<String> getWorkaround() {
return Optional.empty();
}
}
/** mapping of hotspot version to hotspot bug information for the most serious bugs */
static final Map<String,HotspotBug> JVM_BROKEN_HOTSPOT_VERSIONS;
static final Map<String, HotSpotCheck> JVM_BROKEN_HOTSPOT_VERSIONS;
static {
Map<String,HotspotBug> bugs = new HashMap<>();
Map<String, HotSpotCheck> bugs = new HashMap<>();
// 1.7.0: loop optimizer bug
bugs.put("21.0-b17", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-7070134", "-XX:-UseLoopPredicate"));
@ -104,6 +180,12 @@ final class JVMCheck {
bugs.put("24.0-b56", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
bugs.put("24.45-b08", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
bugs.put("24.51-b03", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
G1GCCheck g1GcCheck = new G1GCCheck();
bugs.put("25.0-b70", g1GcCheck);
bugs.put("25.11-b03", g1GcCheck);
bugs.put("25.20-b23", g1GcCheck);
bugs.put("25.25-b02", g1GcCheck);
bugs.put("25.31-b07", g1GcCheck);
JVM_BROKEN_HOTSPOT_VERSIONS = Collections.unmodifiableMap(bugs);
}
@ -115,10 +197,10 @@ final class JVMCheck {
if (Boolean.parseBoolean(System.getProperty(JVM_BYPASS))) {
Loggers.getLogger(JVMCheck.class).warn("bypassing jvm version check for version [{}], this can result in data corruption!", fullVersion());
} else if ("Oracle Corporation".equals(Constants.JVM_VENDOR)) {
HotspotBug bug = JVM_BROKEN_HOTSPOT_VERSIONS.get(Constants.JVM_VERSION);
if (bug != null) {
if (bug.workAround != null && ManagementFactory.getRuntimeMXBean().getInputArguments().contains(bug.workAround)) {
Loggers.getLogger(JVMCheck.class).warn(bug.getWarningMessage());
HotSpotCheck bug = JVM_BROKEN_HOTSPOT_VERSIONS.get(Constants.JVM_VERSION);
if (bug != null && bug.check()) {
if (bug.getWorkaround().isPresent() && ManagementFactory.getRuntimeMXBean().getInputArguments().contains(bug.getWorkaround().get())) {
Loggers.getLogger(JVMCheck.class).warn(bug.getWarningMessage().get());
} else {
throw new RuntimeException(bug.getErrorMessage());
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.joda.time.DateTimeZone;
@ -527,29 +526,35 @@ public class IndexNameExpressionResolver extends AbstractComponent {
return expressions;
}
if (expressions.isEmpty() || (expressions.size() == 1 && (MetaData.ALL.equals(expressions.get(0))) || Regex.isMatchAllPattern(expressions.get(0)))) {
if (options.expandWildcardsOpen() && options.expandWildcardsClosed()) {
return Arrays.asList(metaData.concreteAllIndices());
} else if (options.expandWildcardsOpen()) {
return Arrays.asList(metaData.concreteAllOpenIndices());
} else if (options.expandWildcardsClosed()) {
return Arrays.asList(metaData.concreteAllClosedIndices());
} else {
return Collections.emptyList();
}
if (isEmptyOrTrivialWildcard(expressions)) {
return resolveEmptyOrTrivialWildcard(options, metaData, true);
}
Set<String> result = innerResolve(context, expressions, options, metaData);
if (result == null) {
return expressions;
}
if (result.isEmpty() && !options.allowNoIndices()) {
IndexNotFoundException infe = new IndexNotFoundException((String)null);
infe.setResources("index_or_alias", expressions.toArray(new String[0]));
throw infe;
}
return new ArrayList<>(result);
}
private Set<String> innerResolve(Context context, List<String> expressions, IndicesOptions options, MetaData metaData) {
Set<String> result = null;
for (int i = 0; i < expressions.size(); i++) {
String expression = expressions.get(i);
if (metaData.getAliasAndIndexLookup().containsKey(expression)) {
if (aliasOrIndexExists(metaData, expression)) {
if (result != null) {
result.add(expression);
}
continue;
}
if (Strings.isEmpty(expression)) {
throw new IndexNotFoundException(expression);
throw infe(expression);
}
boolean add = true;
if (expression.charAt(0) == '+') {
@ -557,32 +562,19 @@ public class IndexNameExpressionResolver extends AbstractComponent {
if (i == 0) {
result = new HashSet<>();
}
add = true;
expression = expression.substring(1);
} else if (expression.charAt(0) == '-') {
// if its the first, fill it with all the indices...
if (i == 0) {
String[] concreteIndices;
if (options.expandWildcardsOpen() && options.expandWildcardsClosed()) {
concreteIndices = metaData.concreteAllIndices();
} else if (options.expandWildcardsOpen()) {
concreteIndices = metaData.concreteAllOpenIndices();
} else if (options.expandWildcardsClosed()) {
concreteIndices = metaData.concreteAllClosedIndices();
} else {
assert false : "Shouldn't end up here";
concreteIndices = Strings.EMPTY_ARRAY;
}
result = new HashSet<>(Arrays.asList(concreteIndices));
List<String> concreteIndices = resolveEmptyOrTrivialWildcard(options, metaData, false);
result = new HashSet<>(concreteIndices);
}
add = false;
expression = expression.substring(1);
}
if (!Regex.isSimpleMatchPattern(expression)) {
if (!options.ignoreUnavailable() && !metaData.getAliasAndIndexLookup().containsKey(expression)) {
IndexNotFoundException infe = new IndexNotFoundException(expression);
infe.setResources("index_or_alias", expression);
throw infe;
if (!unavailableIgnoredOrExists(options, metaData, expression)) {
throw infe(expression);
}
if (result != null) {
if (add) {
@ -595,12 +587,46 @@ public class IndexNameExpressionResolver extends AbstractComponent {
}
if (result == null) {
// add all the previous ones...
result = new HashSet<>();
result.addAll(expressions.subList(0, i));
result = new HashSet<>(expressions.subList(0, i));
}
final IndexMetaData.State excludeState = excludeState(options);
final Map<String, AliasOrIndex> matches = matches(metaData, expression);
Set<String> expand = expand(context, excludeState, matches);
if (add) {
result.addAll(expand);
} else {
result.removeAll(expand);
}
if (!noIndicesAllowedOrMatches(options, matches)) {
throw infe(expression);
}
}
return result;
}
private boolean noIndicesAllowedOrMatches(IndicesOptions options, Map<String, AliasOrIndex> matches) {
return options.allowNoIndices() || !matches.isEmpty();
}
private boolean unavailableIgnoredOrExists(IndicesOptions options, MetaData metaData, String expression) {
return options.ignoreUnavailable() || aliasOrIndexExists(metaData, expression);
}
private boolean aliasOrIndexExists(MetaData metaData, String expression) {
return metaData.getAliasAndIndexLookup().containsKey(expression);
}
private static IndexNotFoundException infe(String expression) {
IndexNotFoundException infe = new IndexNotFoundException(expression);
infe.setResources("index_or_alias", expression);
return infe;
}
private static IndexMetaData.State excludeState(IndicesOptions options) {
final IndexMetaData.State excludeState;
if (options.expandWildcardsOpen() && options.expandWildcardsClosed()){
if (options.expandWildcardsOpen() && options.expandWildcardsClosed()) {
excludeState = null;
} else if (options.expandWildcardsOpen() && options.expandWildcardsClosed() == false) {
excludeState = IndexMetaData.State.CLOSE;
@ -610,28 +636,39 @@ public class IndexNameExpressionResolver extends AbstractComponent {
assert false : "this shouldn't get called if wildcards expand to none";
excludeState = null;
}
return excludeState;
}
final Map<String, AliasOrIndex> matches;
private static Map<String, AliasOrIndex> matches(MetaData metaData, String expression) {
if (Regex.isMatchAllPattern(expression)) {
// Can only happen if the expressions was initially: '-*'
matches = metaData.getAliasAndIndexLookup();
return metaData.getAliasAndIndexLookup();
} else if (expression.indexOf("*") == expression.length() - 1) {
// Suffix wildcard:
return suffixWildcard(metaData, expression);
} else {
return otherWildcard(metaData, expression);
}
}
private static Map<String, AliasOrIndex> suffixWildcard(MetaData metaData, String expression) {
assert expression.length() >= 2 : "expression [" + expression + "] should have at least a length of 2";
String fromPrefix = expression.substring(0, expression.length() - 1);
char[] toPrefixCharArr = fromPrefix.toCharArray();
toPrefixCharArr[toPrefixCharArr.length - 1]++;
String toPrefix = new String(toPrefixCharArr);
matches = metaData.getAliasAndIndexLookup().subMap(fromPrefix, toPrefix);
} else {
// Other wildcard expressions:
return metaData.getAliasAndIndexLookup().subMap(fromPrefix, toPrefix);
}
private static Map<String, AliasOrIndex> otherWildcard(MetaData metaData, String expression) {
final String pattern = expression;
matches = metaData.getAliasAndIndexLookup()
return metaData.getAliasAndIndexLookup()
.entrySet()
.stream()
.filter(e -> Regex.simpleMatch(pattern, e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private static Set<String> expand(Context context, IndexMetaData.State excludeState, Map<String, AliasOrIndex> matches) {
Set<String> expand = new HashSet<>();
for (Map.Entry<String, AliasOrIndex> entry : matches.entrySet()) {
AliasOrIndex aliasOrIndex = entry.getValue();
@ -645,27 +682,24 @@ public class IndexNameExpressionResolver extends AbstractComponent {
}
}
}
if (add) {
result.addAll(expand);
} else {
result.removeAll(expand);
return expand;
}
if (matches.isEmpty() && options.allowNoIndices() == false) {
IndexNotFoundException infe = new IndexNotFoundException(expression);
infe.setResources("index_or_alias", expression);
throw infe;
private boolean isEmptyOrTrivialWildcard(List<String> expressions) {
return expressions.isEmpty() || (expressions.size() == 1 && (MetaData.ALL.equals(expressions.get(0))) || Regex.isMatchAllPattern(expressions.get(0)));
}
private List<String> resolveEmptyOrTrivialWildcard(IndicesOptions options, MetaData metaData, boolean assertEmpty) {
if (options.expandWildcardsOpen() && options.expandWildcardsClosed()) {
return Arrays.asList(metaData.concreteAllIndices());
} else if (options.expandWildcardsOpen()) {
return Arrays.asList(metaData.concreteAllOpenIndices());
} else if (options.expandWildcardsClosed()) {
return Arrays.asList(metaData.concreteAllClosedIndices());
} else {
assert assertEmpty : "Shouldn't end up here";
return Collections.emptyList();
}
if (result == null) {
return expressions;
}
if (result.isEmpty() && !options.allowNoIndices()) {
IndexNotFoundException infe = new IndexNotFoundException((String)null);
infe.setResources("index_or_alias", expressions.toArray(new String[0]));
throw infe;
}
return new ArrayList<>(result);
}
}

View File

@ -399,27 +399,16 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
// in the index,bulk,update and delete apis.
public String resolveIndexRouting(@Nullable String parent, @Nullable String routing, String aliasOrIndex) {
if (aliasOrIndex == null) {
if (routing == null) {
return parent;
}
return routing;
return routingOrParent(parent, routing);
}
AliasOrIndex result = getAliasAndIndexLookup().get(aliasOrIndex);
if (result == null || result.isAlias() == false) {
if (routing == null) {
return parent;
}
return routing;
return routingOrParent(parent, routing);
}
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) result;
if (result.getIndices().size() > 1) {
String[] indexNames = new String[result.getIndices().size()];
int i = 0;
for (IndexMetaData indexMetaData : result.getIndices()) {
indexNames[i++] = indexMetaData.getIndex().getName();
}
throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has more than one index associated with it [" + Arrays.toString(indexNames) + "], can't execute a single index op");
rejectSingleIndexOperation(aliasOrIndex, result);
}
AliasMetaData aliasMd = alias.getFirstAliasMetaData();
if (aliasMd.indexRouting() != null) {
@ -434,6 +423,19 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
// Alias routing overrides the parent routing (if any).
return aliasMd.indexRouting();
}
return routingOrParent(parent, routing);
}
private void rejectSingleIndexOperation(String aliasOrIndex, AliasOrIndex result) {
String[] indexNames = new String[result.getIndices().size()];
int i = 0;
for (IndexMetaData indexMetaData : result.getIndices()) {
indexNames[i++] = indexMetaData.getIndex().getName();
}
throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has more than one index associated with it [" + Arrays.toString(indexNames) + "], can't execute a single index op");
}
private String routingOrParent(@Nullable String parent, @Nullable String routing) {
if (routing == null) {
return parent;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -46,8 +47,13 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.currentNodeId() == null) {
if (shardRouting.restoreSource() != null) {
// restoring from a snapshot - check that the node can handle the version
return isVersionCompatible(shardRouting.restoreSource(), node, allocation);
} else {
// fresh primary, we can allocate wherever
return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
}
} else {
// relocating primary, only migrate to newer host
return isVersionCompatible(allocation.routingNodes(), shardRouting.currentNodeId(), node, allocation);
@ -77,4 +83,15 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
target.node().version(), source.node().version());
}
}
private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) {
if (target.node().version().onOrAfter(restoreSource.version())) {
/* we can allocate if we can restore from a snapshot that is older or on the same version */
return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than snapshot version [%s]",
target.node().version(), restoreSource.version());
} else {
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than snapshot version [%s]",
target.node().version(), restoreSource.version());
}
}
}

View File

@ -18,8 +18,10 @@
*/
package org.elasticsearch.common;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Locale;
import java.util.Objects;
/**
* <p>Encodes and decodes to and from Base64 notation.</p>
@ -161,7 +163,7 @@ import java.util.Locale;
* @author rob@iharder.net
* @version 2.3.7
*/
public class Base64 {
public final class Base64 {
/* ******** P U B L I C F I E L D S ******** */
@ -791,10 +793,7 @@ public class Base64 {
* @since 2.3.1
*/
public static byte[] encodeBytesToBytes(byte[] source, int off, int len, int options) throws java.io.IOException {
if (source == null) {
throw new NullPointerException("Cannot serialize a null array.");
} // end if: null
Objects.requireNonNull(source, "Cannot serialize a null array.");
if (off < 0) {
throw new IllegalArgumentException("Cannot have negative offset: " + off);
@ -809,47 +808,19 @@ public class Base64 {
String.format(Locale.ROOT, "Cannot have offset of %d and length of %d with array of length %d", off, len, source.length));
} // end if: off < 0
// Compress?
if ((options & GZIP) != 0) {
java.io.ByteArrayOutputStream baos = null;
java.util.zip.GZIPOutputStream gzos = null;
Base64.OutputStream b64os = null;
try {
// GZip -> Base64 -> ByteArray
baos = new java.io.ByteArrayOutputStream();
b64os = new Base64.OutputStream(baos, ENCODE | options);
gzos = new java.util.zip.GZIPOutputStream(b64os);
gzos.write(source, off, len);
gzos.close();
} // end try
catch (java.io.IOException e) {
// Catch it and then throw it immediately so that
// the finally{} block is called for cleanup.
throw e;
} // end catch
finally {
try {
gzos.close();
} catch (Exception e) {
}
try {
b64os.close();
} catch (Exception e) {
}
try {
baos.close();
} catch (Exception e) {
}
} // end finally
return baos.toByteArray();
return encodeCompressedBytes(source, off, len, options);
} // end if: compress
// Else, don't compress. Better not to use streams at all then.
else {
return encodeNonCompressedBytes(source, off, len, options);
} // end else: don't compress
} // end encodeBytesToBytes
private static byte[] encodeNonCompressedBytes(byte[] source, int off, int len, int options) {
boolean breakLines = (options & DO_BREAK_LINES) != 0;
//int len43 = len * 4 / 3;
@ -901,10 +872,44 @@ public class Base64 {
//System.err.println("No need to resize array.");
return outBuff;
}
}
} // end else: don't compress
private static byte[] encodeCompressedBytes(byte[] source, int off, int len, int options) throws IOException {
java.io.ByteArrayOutputStream baos = null;
java.util.zip.GZIPOutputStream gzos = null;
OutputStream b64os = null;
} // end encodeBytesToBytes
try {
// GZip -> Base64 -> ByteArray
baos = new java.io.ByteArrayOutputStream();
b64os = new OutputStream(baos, ENCODE | options);
gzos = new java.util.zip.GZIPOutputStream(b64os);
gzos.write(source, off, len);
gzos.close();
} // end try
catch (IOException e) {
// Catch it and then throw it immediately so that
// the finally{} block is called for cleanup.
throw e;
} // end catch
finally {
try {
gzos.close();
} catch (Exception e) {
}
try {
b64os.close();
} catch (Exception e) {
}
try {
baos.close();
} catch (Exception e) {
}
} // end finally
return baos.toByteArray();
}
/* ******** D E C O D I N G M E T H O D S ******** */
@ -937,17 +942,10 @@ public class Base64 {
* or there is not enough room in the array.
* @since 1.3
*/
private static int decode4to3(
byte[] source, int srcOffset,
byte[] destination, int destOffset, int options) {
private static int decode4to3(byte[] source, int srcOffset, byte[] destination, int destOffset, int options) {
// Lots of error checking and exception throwing
if (source == null) {
throw new NullPointerException("Source array was null.");
} // end if
if (destination == null) {
throw new NullPointerException("Destination array was null.");
} // end if
Objects.requireNonNull(source, "Source array was null.");
Objects.requireNonNull(destination, "Destination array was null.");
if (srcOffset < 0 || srcOffset + 3 >= source.length) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Source array with length %d cannot have offset of %d and still process four bytes.", source.length, srcOffset));
@ -957,11 +955,9 @@ public class Base64 {
"Destination array with length %d cannot have offset of %d and still store three bytes.", destination.length, destOffset));
} // end if
byte[] DECODABET = getDecodabet(options);
// Example: Dk==
if (source[srcOffset + 2] == EQUALS_SIGN) {
// Two ways to do the same thing. Don't know which way I like best.
//int outBuff = ( ( DECODABET[ source[ srcOffset ] ] << 24 ) >>> 6 )
// | ( ( DECODABET[ source[ srcOffset + 1] ] << 24 ) >>> 12 );
@ -969,44 +965,26 @@ public class Base64 {
| ((DECODABET[source[srcOffset + 1]] & 0xFF) << 12);
destination[destOffset] = (byte) (outBuff >>> 16);
// Example: Dk==
if (source[srcOffset + 2] == EQUALS_SIGN) {
return 1;
}
// Example: DkL=
else if (source[srcOffset + 3] == EQUALS_SIGN) {
// Two ways to do the same thing. Don't know which way I like best.
//int outBuff = ( ( DECODABET[ source[ srcOffset ] ] << 24 ) >>> 6 )
// | ( ( DECODABET[ source[ srcOffset + 1 ] ] << 24 ) >>> 12 )
// | ( ( DECODABET[ source[ srcOffset + 2 ] ] << 24 ) >>> 18 );
int outBuff = ((DECODABET[source[srcOffset]] & 0xFF) << 18)
| ((DECODABET[source[srcOffset + 1]] & 0xFF) << 12)
| ((DECODABET[source[srcOffset + 2]] & 0xFF) << 6);
destination[destOffset] = (byte) (outBuff >>> 16);
outBuff |= ((DECODABET[source[srcOffset + 2]] & 0xFF) << 6);
destination[destOffset + 1] = (byte) (outBuff >>> 8);
// Example: DkL=
if (source[srcOffset + 3] == EQUALS_SIGN) {
return 2;
}
// Example: DkLE
else {
// Two ways to do the same thing. Don't know which way I like best.
//int outBuff = ( ( DECODABET[ source[ srcOffset ] ] << 24 ) >>> 6 )
// | ( ( DECODABET[ source[ srcOffset + 1 ] ] << 24 ) >>> 12 )
// | ( ( DECODABET[ source[ srcOffset + 2 ] ] << 24 ) >>> 18 )
// | ( ( DECODABET[ source[ srcOffset + 3 ] ] << 24 ) >>> 24 );
int outBuff = ((DECODABET[source[srcOffset]] & 0xFF) << 18)
| ((DECODABET[source[srcOffset + 1]] & 0xFF) << 12)
| ((DECODABET[source[srcOffset + 2]] & 0xFF) << 6)
| ((DECODABET[source[srcOffset + 3]] & 0xFF));
destination[destOffset] = (byte) (outBuff >> 16);
destination[destOffset + 1] = (byte) (outBuff >> 8);
outBuff |= ((DECODABET[source[srcOffset + 3]] & 0xFF));
destination[destOffset + 2] = (byte) (outBuff);
// Example: DkLE
return 3;
}
} // end decodeToBytes
/**
@ -1051,13 +1029,9 @@ public class Base64 {
* @throws java.io.IOException If bogus characters exist in source data
* @since 1.3
*/
public static byte[] decode(byte[] source, int off, int len, int options)
throws java.io.IOException {
public static byte[] decode(byte[] source, int off, int len, int options) throws java.io.IOException {
// Lots of error checking and exception throwing
if (source == null) {
throw new NullPointerException("Cannot decode null source array.");
} // end if
Objects.requireNonNull(source, "Cannot decode null source array.");
if (off < 0 || off + len > source.length) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Source array with length %d cannot have offset of %d and process %d bytes.", source.length, off, len));
@ -1074,16 +1048,21 @@ public class Base64 {
int len34 = len * 3 / 4; // Estimate on array size
byte[] outBuff = new byte[len34]; // Upper limit on size of output
int outBuffPosn = 0; // Keep track of where we're writing
int outBuffPosn = decode(source, off, len, options, DECODABET, outBuff);
byte[] out = new byte[outBuffPosn];
System.arraycopy(outBuff, 0, out, 0, outBuffPosn);
return out;
} // end decode
private static int decode(byte[] source, int off, int len, int options, byte[] DECODABET, byte[] outBuff) throws IOException {
int outBuffPosn = 0; // Keep track of where we're writing
byte[] b4 = new byte[4]; // Four byte buffer from source, eliminating white space
int b4Posn = 0; // Keep track of four byte input buffer
int i = 0; // Source array counter
byte sbiDecode = 0; // Special value from DECODABET
for (int i = off; i < off + len; i++) { // Loop through source
for (i = off; i < off + len; i++) { // Loop through source
sbiDecode = DECODABET[source[i] & 0xFF];
byte sbiDecode = DECODABET[source[i] & 0xFF];
// White space, Equals sign, or legit Base64 character
// Note the values such as -5 and -9 in the
@ -1099,7 +1078,7 @@ public class Base64 {
if (source[i] == EQUALS_SIGN) {
// check if the equals sign is somewhere in between
if (i+1 < len + off) {
throw new java.io.IOException(String.format(Locale.ROOT,
throw new IOException(String.format(Locale.ROOT,
"Found equals sign at position %d of the base64 string, not at the end", i));
}
break;
@ -1107,7 +1086,7 @@ public class Base64 {
} // end if: quartet built
else {
if (source[i] == EQUALS_SIGN && len + off > i && source[i+1] != EQUALS_SIGN) {
throw new java.io.IOException(String.format(Locale.ROOT,
throw new IOException(String.format(Locale.ROOT,
"Found equals sign at position %d of the base64 string, not at the end", i));
} // enf if: equals sign and next character not as well
} // end else:
@ -1115,15 +1094,12 @@ public class Base64 {
} // end if: white space, equals sign or better
else {
// There's a bad input character in the Base64 stream.
throw new java.io.IOException(String.format(Locale.ROOT,
throw new IOException(String.format(Locale.ROOT,
"Bad Base64 input character decimal %d in array position %d", ((int) source[i]) & 0xFF, i));
} // end else:
} // each input character
byte[] out = new byte[outBuffPosn];
System.arraycopy(outBuff, 0, out, 0, outBuffPosn);
return out;
} // end decode
return outBuffPosn;
}
/**

View File

@ -118,13 +118,36 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
// .addAndGet() instead of looping (because we don't have to check a
// limit), which makes the RamAccountingTermsEnum case faster.
if (this.memoryBytesLimit == -1) {
newUsed = noLimit(bytes, label);
} else {
newUsed = limit(bytes, label);
}
// Additionally, we need to check that we haven't exceeded the parent's limit
try {
parent.checkParentLimit(label);
} catch (CircuitBreakingException e) {
// If the parent breaker is tripped, this breaker has to be
// adjusted back down because the allocation is "blocked" but the
// breaker has already been incremented
this.addWithoutBreaking(-bytes);
throw e;
}
return newUsed;
}
private long noLimit(long bytes, String label) {
long newUsed;
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
}
} else {
// Otherwise, check the addition and commit the addition, looping if
return newUsed;
}
private long limit(long bytes, String label) {
long newUsed;// Otherwise, check the addition and commit the addition, looping if
// there are conflicts. May result in additional logging, but it's
// trace logging and shouldn't be counted on for additions.
long currentUsed;
@ -149,18 +172,6 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
} while (!this.used.compareAndSet(currentUsed, newUsed));
}
// Additionally, we need to check that we haven't exceeded the parent's limit
try {
parent.checkParentLimit(label);
} catch (CircuitBreakingException e) {
// If the parent breaker is tripped, this breaker has to be
// adjusted back down because the allocation is "blocked" but the
// breaker has already been incremented
this.addWithoutBreaking(-bytes);
throw e;
}
return newUsed;
}

View File

@ -18,8 +18,6 @@
*/
package org.elasticsearch.common.netty;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
@ -74,7 +72,7 @@ public class NettyUtils {
* sized pages, and if its a single one, makes sure that it gets sliced and wrapped in a composite
* buffer.
*/
public static final boolean DEFAULT_GATHERING;
public static final boolean DEFAULT_GATHERING = true;
private static EsThreadNameDeterminer ES_THREAD_NAME_DETERMINER = new EsThreadNameDeterminer();
@ -95,13 +93,6 @@ public class NettyUtils {
});
ThreadRenamingRunnable.setThreadNameDeterminer(ES_THREAD_NAME_DETERMINER);
/**
* This is here just to give us an option to rollback the change, if its stable, we should remove
* the option to even set it.
*/
DEFAULT_GATHERING = Booleans.parseBoolean(System.getProperty("es.netty.gathering"), true);
Loggers.getLogger(NettyUtils.class).debug("using gathering [{}]", DEFAULT_GATHERING);
}
public static void setup() {

View File

@ -265,17 +265,17 @@ public class TimeValue implements Streamable {
long millis;
String lowerSValue = sValue.toLowerCase(Locale.ROOT).trim();
if (lowerSValue.endsWith("ms")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 2)));
millis = parse(lowerSValue, 2, 1);
} else if (lowerSValue.endsWith("s")) {
millis = (long) Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 1000;
millis = parse(lowerSValue, 1, 1000);
} else if (lowerSValue.endsWith("m")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 60 * 1000);
millis = parse(lowerSValue, 1, 60 * 1000);
} else if (lowerSValue.endsWith("h")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 60 * 60 * 1000);
millis = parse(lowerSValue, 1, 60 * 60 * 1000);
} else if (lowerSValue.endsWith("d")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 24 * 60 * 60 * 1000);
millis = parse(lowerSValue, 1, 24 * 60 * 60 * 1000);
} else if (lowerSValue.endsWith("w")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 7 * 24 * 60 * 60 * 1000);
millis = parse(lowerSValue, 1, 7 * 24 * 60 * 60 * 1000);
} else if (lowerSValue.equals("-1")) {
// Allow this special value to be unit-less:
millis = -1;
@ -292,6 +292,10 @@ public class TimeValue implements Streamable {
}
}
private static long parse(String s, int suffixLength, long scale) {
return (long) (Double.parseDouble(s.substring(0, s.length() - suffixLength)) * scale);
}
static final long C0 = 1L;
static final long C1 = C0 * 1000L;
static final long C2 = C1 * 1000L;

View File

@ -43,9 +43,12 @@ import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
/**
*
@ -1227,52 +1230,101 @@ public final class XContentBuilder implements BytesStream, Releasable {
generator.writeEndObject();
}
@FunctionalInterface
interface Writer {
void write(XContentGenerator g, Object v) throws IOException;
}
private final static Map<Class<?>, Writer> MAP;
static {
Map<Class<?>, Writer> map = new HashMap<>();
map.put(String.class, (g, v) -> g.writeString((String) v));
map.put(Integer.class, (g, v) -> g.writeNumber((Integer) v));
map.put(Long.class, (g, v) -> g.writeNumber((Long) v));
map.put(Float.class, (g, v) -> g.writeNumber((Float) v));
map.put(Double.class, (g, v) -> g.writeNumber((Double) v));
map.put(Byte.class, (g, v) -> g.writeNumber((Byte) v));
map.put(Short.class, (g, v) -> g.writeNumber((Short) v));
map.put(Boolean.class, (g, v) -> g.writeBoolean((Boolean) v));
map.put(GeoPoint.class, (g, v) -> {
g.writeStartObject();
g.writeNumberField("lat", ((GeoPoint) v).lat());
g.writeNumberField("lon", ((GeoPoint) v).lon());
g.writeEndObject();
});
map.put(int[].class, (g, v) -> {
g.writeStartArray();
for (int item : (int[]) v) {
g.writeNumber(item);
}
g.writeEndArray();
});
map.put(long[].class, (g, v) -> {
g.writeStartArray();
for (long item : (long[]) v) {
g.writeNumber(item);
}
g.writeEndArray();
});
map.put(float[].class, (g, v) -> {
g.writeStartArray();
for (float item : (float[]) v) {
g.writeNumber(item);
}
g.writeEndArray();
});
map.put(double[].class, (g, v) -> {
g.writeStartArray();
for (double item : (double[])v) {
g.writeNumber(item);
}
g.writeEndArray();
});
map.put(byte[].class, (g, v) -> g.writeBinary((byte[]) v));
map.put(short[].class, (g, v) -> {
g.writeStartArray();
for (short item : (short[])v) {
g.writeNumber(item);
}
g.writeEndArray();
});
map.put(BytesRef.class, (g, v) -> {
BytesRef bytes = (BytesRef) v;
g.writeBinary(bytes.bytes, bytes.offset, bytes.length);
});
map.put(Text.class, (g, v) -> {
Text text = (Text) v;
if (text.hasBytes() && text.bytes().hasArray()) {
g.writeUTF8String(text.bytes().array(), text.bytes().arrayOffset(), text.bytes().length());
} else if (text.hasString()) {
g.writeString(text.string());
} else {
BytesArray bytesArray = text.bytes().toBytesArray();
g.writeUTF8String(bytesArray.array(), bytesArray.arrayOffset(), bytesArray.length());
}
});
MAP = Collections.unmodifiableMap(map);
}
private void writeValue(Object value) throws IOException {
if (value == null) {
generator.writeNull();
return;
}
Class<?> type = value.getClass();
if (type == String.class) {
generator.writeString((String) value);
} else if (type == Integer.class) {
generator.writeNumber(((Integer) value).intValue());
} else if (type == Long.class) {
generator.writeNumber(((Long) value).longValue());
} else if (type == Float.class) {
generator.writeNumber(((Float) value).floatValue());
} else if (type == Double.class) {
generator.writeNumber(((Double) value).doubleValue());
} else if (type == Byte.class) {
generator.writeNumber(((Byte)value).byteValue());
} else if (type == Short.class) {
generator.writeNumber(((Short) value).shortValue());
} else if (type == Boolean.class) {
generator.writeBoolean(((Boolean) value).booleanValue());
} else if (type == GeoPoint.class) {
generator.writeStartObject();
generator.writeNumberField("lat", ((GeoPoint) value).lat());
generator.writeNumberField("lon", ((GeoPoint) value).lon());
generator.writeEndObject();
Writer writer = MAP.get(type);
if (writer != null) {
writer.write(generator, value);
} else if (value instanceof Map) {
writeMap((Map) value);
} else if (value instanceof Path) {
//Path implements Iterable<Path> and causes endless recursion and a StackOverFlow if treated as an Iterable here
generator.writeString(value.toString());
} else if (value instanceof Iterable) {
generator.writeStartArray();
for (Object v : (Iterable<?>) value) {
writeValue(v);
}
generator.writeEndArray();
writeIterable((Iterable<?>) value);
} else if (value instanceof Object[]) {
generator.writeStartArray();
for (Object v : (Object[]) value) {
writeValue(v);
}
generator.writeEndArray();
} else if (type == byte[].class) {
generator.writeBinary((byte[]) value);
writeObjectArray((Object[]) value);
} else if (value instanceof Date) {
generator.writeString(XContentBuilder.defaultDatePrinter.print(((Date) value).getTime()));
} else if (value instanceof Calendar) {
@ -1280,56 +1332,9 @@ public final class XContentBuilder implements BytesStream, Releasable {
} else if (value instanceof ReadableInstant) {
generator.writeString(XContentBuilder.defaultDatePrinter.print((((ReadableInstant) value)).getMillis()));
} else if (value instanceof BytesReference) {
BytesReference bytes = (BytesReference) value;
if (!bytes.hasArray()) {
bytes = bytes.toBytesArray();
}
generator.writeBinary(bytes.array(), bytes.arrayOffset(), bytes.length());
} else if (value instanceof BytesRef) {
BytesRef bytes = (BytesRef) value;
generator.writeBinary(bytes.bytes, bytes.offset, bytes.length);
} else if (value instanceof Text) {
Text text = (Text) value;
if (text.hasBytes() && text.bytes().hasArray()) {
generator.writeUTF8String(text.bytes().array(), text.bytes().arrayOffset(), text.bytes().length());
} else if (text.hasString()) {
generator.writeString(text.string());
} else {
BytesArray bytesArray = text.bytes().toBytesArray();
generator.writeUTF8String(bytesArray.array(), bytesArray.arrayOffset(), bytesArray.length());
}
writeBytesReference((BytesReference) value);
} else if (value instanceof ToXContent) {
((ToXContent) value).toXContent(this, ToXContent.EMPTY_PARAMS);
} else if (value instanceof double[]) {
generator.writeStartArray();
for (double v : (double[]) value) {
generator.writeNumber(v);
}
generator.writeEndArray();
} else if (value instanceof long[]) {
generator.writeStartArray();
for (long v : (long[]) value) {
generator.writeNumber(v);
}
generator.writeEndArray();
} else if (value instanceof int[]) {
generator.writeStartArray();
for (int v : (int[]) value) {
generator.writeNumber(v);
}
generator.writeEndArray();
} else if (value instanceof float[]) {
generator.writeStartArray();
for (float v : (float[]) value) {
generator.writeNumber(v);
}
generator.writeEndArray();
} else if (value instanceof short[]) {
generator.writeStartArray();
for (short v : (short[]) value) {
generator.writeNumber(v);
}
generator.writeEndArray();
} else {
// if this is a "value" object, like enum, DistanceUnit, ..., just toString it
// yea, it can be misleading when toString a Java class, but really, jackson should be used in that case
@ -1337,4 +1342,29 @@ public final class XContentBuilder implements BytesStream, Releasable {
//throw new ElasticsearchIllegalArgumentException("type not supported for generic value conversion: " + type);
}
}
private void writeBytesReference(BytesReference value) throws IOException {
BytesReference bytes = value;
if (!bytes.hasArray()) {
bytes = bytes.toBytesArray();
}
generator.writeBinary(bytes.array(), bytes.arrayOffset(), bytes.length());
}
private void writeIterable(Iterable<?> value) throws IOException {
generator.writeStartArray();
for (Object v : value) {
writeValue(v);
}
generator.writeEndArray();
}
private void writeObjectArray(Object[] value) throws IOException {
generator.writeStartArray();
for (Object v : value) {
writeValue(v);
}
generator.writeEndArray();
}
}

View File

@ -43,6 +43,8 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -51,10 +53,7 @@ import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE;
/**
*
*/
public class NettyHttpChannel extends HttpChannel {
public final class NettyHttpChannel extends HttpChannel {
private final NettyHttpServerTransport transport;
private final Channel channel;
@ -92,18 +91,11 @@ public class NettyHttpChannel extends HttpChannel {
String opaque = nettyRequest.headers().get("X-Opaque-Id");
if (opaque != null) {
resp.headers().add("X-Opaque-Id", opaque);
setHeaderField(resp, "X-Opaque-Id", opaque);
}
// Add all custom headers
Map<String, List<String>> customHeaders = response.getHeaders();
if (customHeaders != null) {
for (Map.Entry<String, List<String>> headerEntry : customHeaders.entrySet()) {
for (String headerValue : headerEntry.getValue()) {
resp.headers().add(headerEntry.getKey(), headerValue);
}
}
}
addCustomHeaders(response, resp);
BytesReference content = response.content();
ChannelBuffer buffer;
@ -113,30 +105,11 @@ public class NettyHttpChannel extends HttpChannel {
resp.setContent(buffer);
// If our response doesn't specify a content-type header, set one
if (!resp.headers().contains(HttpHeaders.Names.CONTENT_TYPE)) {
resp.headers().add(HttpHeaders.Names.CONTENT_TYPE, response.contentType());
}
setHeaderField(resp, HttpHeaders.Names.CONTENT_TYPE, response.contentType(), false);
// If our response has no content-length, calculate and set one
if (!resp.headers().contains(HttpHeaders.Names.CONTENT_LENGTH)) {
resp.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes()));
}
setHeaderField(resp, HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes()), false);
if (transport.resetCookies) {
String cookieString = nettyRequest.headers().get(HttpHeaders.Names.COOKIE);
if (cookieString != null) {
CookieDecoder cookieDecoder = new CookieDecoder();
Set<Cookie> cookies = cookieDecoder.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
CookieEncoder cookieEncoder = new CookieEncoder(true);
for (Cookie cookie : cookies) {
cookieEncoder.addCookie(cookie);
}
resp.headers().add(HttpHeaders.Names.SET_COOKIE, cookieEncoder.encode());
}
}
}
addCookies(resp);
ChannelFuture future;
@ -164,6 +137,45 @@ public class NettyHttpChannel extends HttpChannel {
}
}
private void setHeaderField(HttpResponse resp, String headerField, String value) {
setHeaderField(resp, headerField, value, true);
}
private void setHeaderField(HttpResponse resp, String headerField, String value, boolean override) {
if (override || !resp.headers().contains(headerField)) {
resp.headers().add(headerField, value);
}
}
private void addCookies(HttpResponse resp) {
if (transport.resetCookies) {
String cookieString = nettyRequest.headers().get(HttpHeaders.Names.COOKIE);
if (cookieString != null) {
CookieDecoder cookieDecoder = new CookieDecoder();
Set<Cookie> cookies = cookieDecoder.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
CookieEncoder cookieEncoder = new CookieEncoder(true);
for (Cookie cookie : cookies) {
cookieEncoder.addCookie(cookie);
}
setHeaderField(resp, HttpHeaders.Names.SET_COOKIE, cookieEncoder.encode());
}
}
}
}
private void addCustomHeaders(RestResponse response, HttpResponse resp) {
Map<String, List<String>> customHeaders = response.getHeaders();
if (customHeaders != null) {
for (Map.Entry<String, List<String>> headerEntry : customHeaders.entrySet()) {
for (String headerValue : headerEntry.getValue()) {
setHeaderField(resp, headerEntry.getKey(), headerValue);
}
}
}
}
// Determine if the request protocol version is HTTP 1.0
private boolean isHttp10() {
return nettyRequest.getProtocolVersion().equals(HttpVersion.HTTP_1_0);
@ -196,101 +208,59 @@ public class NettyHttpChannel extends HttpChannel {
private static final HttpResponseStatus TOO_MANY_REQUESTS = new HttpResponseStatus(429, "Too Many Requests");
private HttpResponseStatus getStatus(RestStatus status) {
switch (status) {
case CONTINUE:
return HttpResponseStatus.CONTINUE;
case SWITCHING_PROTOCOLS:
return HttpResponseStatus.SWITCHING_PROTOCOLS;
case OK:
return HttpResponseStatus.OK;
case CREATED:
return HttpResponseStatus.CREATED;
case ACCEPTED:
return HttpResponseStatus.ACCEPTED;
case NON_AUTHORITATIVE_INFORMATION:
return HttpResponseStatus.NON_AUTHORITATIVE_INFORMATION;
case NO_CONTENT:
return HttpResponseStatus.NO_CONTENT;
case RESET_CONTENT:
return HttpResponseStatus.RESET_CONTENT;
case PARTIAL_CONTENT:
return HttpResponseStatus.PARTIAL_CONTENT;
case MULTI_STATUS:
// no status for this??
return HttpResponseStatus.INTERNAL_SERVER_ERROR;
case MULTIPLE_CHOICES:
return HttpResponseStatus.MULTIPLE_CHOICES;
case MOVED_PERMANENTLY:
return HttpResponseStatus.MOVED_PERMANENTLY;
case FOUND:
return HttpResponseStatus.FOUND;
case SEE_OTHER:
return HttpResponseStatus.SEE_OTHER;
case NOT_MODIFIED:
return HttpResponseStatus.NOT_MODIFIED;
case USE_PROXY:
return HttpResponseStatus.USE_PROXY;
case TEMPORARY_REDIRECT:
return HttpResponseStatus.TEMPORARY_REDIRECT;
case BAD_REQUEST:
return HttpResponseStatus.BAD_REQUEST;
case UNAUTHORIZED:
return HttpResponseStatus.UNAUTHORIZED;
case PAYMENT_REQUIRED:
return HttpResponseStatus.PAYMENT_REQUIRED;
case FORBIDDEN:
return HttpResponseStatus.FORBIDDEN;
case NOT_FOUND:
return HttpResponseStatus.NOT_FOUND;
case METHOD_NOT_ALLOWED:
return HttpResponseStatus.METHOD_NOT_ALLOWED;
case NOT_ACCEPTABLE:
return HttpResponseStatus.NOT_ACCEPTABLE;
case PROXY_AUTHENTICATION:
return HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED;
case REQUEST_TIMEOUT:
return HttpResponseStatus.REQUEST_TIMEOUT;
case CONFLICT:
return HttpResponseStatus.CONFLICT;
case GONE:
return HttpResponseStatus.GONE;
case LENGTH_REQUIRED:
return HttpResponseStatus.LENGTH_REQUIRED;
case PRECONDITION_FAILED:
return HttpResponseStatus.PRECONDITION_FAILED;
case REQUEST_ENTITY_TOO_LARGE:
return HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
case REQUEST_URI_TOO_LONG:
return HttpResponseStatus.REQUEST_URI_TOO_LONG;
case UNSUPPORTED_MEDIA_TYPE:
return HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE;
case REQUESTED_RANGE_NOT_SATISFIED:
return HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
case EXPECTATION_FAILED:
return HttpResponseStatus.EXPECTATION_FAILED;
case UNPROCESSABLE_ENTITY:
return HttpResponseStatus.BAD_REQUEST;
case LOCKED:
return HttpResponseStatus.BAD_REQUEST;
case FAILED_DEPENDENCY:
return HttpResponseStatus.BAD_REQUEST;
case TOO_MANY_REQUESTS:
return TOO_MANY_REQUESTS;
case INTERNAL_SERVER_ERROR:
return HttpResponseStatus.INTERNAL_SERVER_ERROR;
case NOT_IMPLEMENTED:
return HttpResponseStatus.NOT_IMPLEMENTED;
case BAD_GATEWAY:
return HttpResponseStatus.BAD_GATEWAY;
case SERVICE_UNAVAILABLE:
return HttpResponseStatus.SERVICE_UNAVAILABLE;
case GATEWAY_TIMEOUT:
return HttpResponseStatus.GATEWAY_TIMEOUT;
case HTTP_VERSION_NOT_SUPPORTED:
return HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
default:
return HttpResponseStatus.INTERNAL_SERVER_ERROR;
static Map<RestStatus, HttpResponseStatus> MAP;
static {
EnumMap<RestStatus, HttpResponseStatus> map = new EnumMap<>(RestStatus.class);
map.put(RestStatus.CONTINUE, HttpResponseStatus.CONTINUE);
map.put(RestStatus.SWITCHING_PROTOCOLS, HttpResponseStatus.SWITCHING_PROTOCOLS);
map.put(RestStatus.OK, HttpResponseStatus.OK);
map.put(RestStatus.CREATED, HttpResponseStatus.CREATED);
map.put(RestStatus.ACCEPTED, HttpResponseStatus.ACCEPTED);
map.put(RestStatus.NON_AUTHORITATIVE_INFORMATION, HttpResponseStatus.NON_AUTHORITATIVE_INFORMATION);
map.put(RestStatus.NO_CONTENT, HttpResponseStatus.NO_CONTENT);
map.put(RestStatus.RESET_CONTENT, HttpResponseStatus.RESET_CONTENT);
map.put(RestStatus.PARTIAL_CONTENT, HttpResponseStatus.PARTIAL_CONTENT);
map.put(RestStatus.MULTI_STATUS, HttpResponseStatus.INTERNAL_SERVER_ERROR); // no status for this??
map.put(RestStatus.MULTIPLE_CHOICES, HttpResponseStatus.MULTIPLE_CHOICES);
map.put(RestStatus.MOVED_PERMANENTLY, HttpResponseStatus.MOVED_PERMANENTLY);
map.put(RestStatus.FOUND, HttpResponseStatus.FOUND);
map.put(RestStatus.SEE_OTHER, HttpResponseStatus.SEE_OTHER);
map.put(RestStatus.NOT_MODIFIED, HttpResponseStatus.NOT_MODIFIED);
map.put(RestStatus.USE_PROXY, HttpResponseStatus.USE_PROXY);
map.put(RestStatus.TEMPORARY_REDIRECT, HttpResponseStatus.TEMPORARY_REDIRECT);
map.put(RestStatus.BAD_REQUEST, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.UNAUTHORIZED, HttpResponseStatus.UNAUTHORIZED);
map.put(RestStatus.PAYMENT_REQUIRED, HttpResponseStatus.PAYMENT_REQUIRED);
map.put(RestStatus.FORBIDDEN, HttpResponseStatus.FORBIDDEN);
map.put(RestStatus.NOT_FOUND, HttpResponseStatus.NOT_FOUND);
map.put(RestStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.METHOD_NOT_ALLOWED);
map.put(RestStatus.NOT_ACCEPTABLE, HttpResponseStatus.NOT_ACCEPTABLE);
map.put(RestStatus.PROXY_AUTHENTICATION, HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED);
map.put(RestStatus.REQUEST_TIMEOUT, HttpResponseStatus.REQUEST_TIMEOUT);
map.put(RestStatus.CONFLICT, HttpResponseStatus.CONFLICT);
map.put(RestStatus.GONE, HttpResponseStatus.GONE);
map.put(RestStatus.LENGTH_REQUIRED, HttpResponseStatus.LENGTH_REQUIRED);
map.put(RestStatus.PRECONDITION_FAILED, HttpResponseStatus.PRECONDITION_FAILED);
map.put(RestStatus.REQUEST_ENTITY_TOO_LARGE, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
map.put(RestStatus.REQUEST_URI_TOO_LONG, HttpResponseStatus.REQUEST_URI_TOO_LONG);
map.put(RestStatus.UNSUPPORTED_MEDIA_TYPE, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE);
map.put(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE);
map.put(RestStatus.EXPECTATION_FAILED, HttpResponseStatus.EXPECTATION_FAILED);
map.put(RestStatus.UNPROCESSABLE_ENTITY, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.LOCKED, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.FAILED_DEPENDENCY, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.TOO_MANY_REQUESTS, TOO_MANY_REQUESTS);
map.put(RestStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.INTERNAL_SERVER_ERROR);
map.put(RestStatus.NOT_IMPLEMENTED, HttpResponseStatus.NOT_IMPLEMENTED);
map.put(RestStatus.BAD_GATEWAY, HttpResponseStatus.BAD_GATEWAY);
map.put(RestStatus.SERVICE_UNAVAILABLE, HttpResponseStatus.SERVICE_UNAVAILABLE);
map.put(RestStatus.GATEWAY_TIMEOUT, HttpResponseStatus.GATEWAY_TIMEOUT);
map.put(RestStatus.HTTP_VERSION_NOT_SUPPORTED, HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED);
MAP = Collections.unmodifiableMap(map);
}
private static HttpResponseStatus getStatus(RestStatus status) {
return MAP.getOrDefault(status, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}

View File

@ -362,13 +362,12 @@ public class InternalEngine extends Engine {
}
long expectedVersion = index.version();
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return false;
} else {
if (isVersionConflictForWrites(index, currentVersion, deleted, expectedVersion)) {
if (index.origin() != Operation.Origin.RECOVERY) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(),
index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
}
return false;
}
long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
@ -378,12 +377,20 @@ public class InternalEngine extends Engine {
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
created = true;
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs());
index(index, indexWriter);
} else {
indexWriter.addDocument(index.docs().get(0));
created = update(index, versionValue, indexWriter);
}
} else {
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
index.setTranslogLocation(translogLocation);
return created;
}
}
private static boolean update(Index index, VersionValue versionValue, IndexWriter indexWriter) throws IOException {
boolean created;
if (versionValue != null) {
created = versionValue.delete(); // we have a delete which is not GC'ed...
} else {
@ -394,13 +401,19 @@ public class InternalEngine extends Engine {
} else {
indexWriter.updateDocument(index.uid(), index.docs().get(0));
}
}
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
index.setTranslogLocation(translogLocation);
return created;
}
private static void index(Index index, IndexWriter indexWriter) throws IOException {
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs());
} else {
indexWriter.addDocument(index.docs().get(0));
}
}
private boolean isVersionConflictForWrites(Index index, long currentVersion, boolean deleted, long expectedVersion) {
return index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted);
}
@Override

View File

@ -190,8 +190,14 @@ public final class OrdinalsBuilder implements Closeable {
public int addOrdinal(int docID, long ordinal) {
final long position = positions.get(docID);
if (position == 0L) { // on the first level
return firstLevel(docID, ordinal);
} else {
return nonFirstLevel(docID, ordinal, position);
}
}
private int firstLevel(int docID, long ordinal) {
// 0 or 1 ordinal
if (firstOrdinals.get(docID) == 0L) {
firstOrdinals.set(docID, ordinal + 1);
@ -207,7 +213,9 @@ public final class OrdinalsBuilder implements Closeable {
positions.set(docID, position(1, offset)); // current position is on the 1st level and not allocated yet
return 2;
}
} else {
}
private int nonFirstLevel(int docID, long ordinal, long position) {
int level = level(position);
long offset = offset(position, level);
assert offset != 0L;
@ -230,7 +238,6 @@ public final class OrdinalsBuilder implements Closeable {
positions.set(docID, newPosition);
return numOrdinals(level, offset);
}
}
public void appendOrdinals(int docID, LongsRef ords) {
// First level

View File

@ -75,43 +75,40 @@ class DocumentParser implements Closeable {
this.docMapper = docMapper;
}
public ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException {
if (docMapper.type().equals(MapperService.DEFAULT_MAPPING)) {
throw new IllegalArgumentException("It is forbidden to index into the default mapping [" + MapperService.DEFAULT_MAPPING + "]");
}
final ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException {
validateType(source);
ParseContext.InternalParseContext context = cache.get();
final Mapping mapping = docMapper.mapping();
if (source.type() != null && !source.type().equals(docMapper.type())) {
throw new MapperParsingException("Type mismatch, provide type [" + source.type() + "] but mapper is of type [" + docMapper.type() + "]");
}
source.type(docMapper.type());
XContentParser parser = source.parser();
final Mapping mapping = docMapper.mapping();
final ParseContext.InternalParseContext context = cache.get();
XContentParser parser = null;
try {
if (parser == null) {
parser = XContentHelper.createParser(source.source());
}
parser = parser(source);
context.reset(parser, new ParseContext.Document(), source);
// will result in START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new MapperParsingException("Malformed content, must start with an object");
validateStart(parser);
internalParseDocument(mapping, context, parser);
validateEnd(source, parser);
} catch (Throwable t) {
throw wrapInMapperParsingException(source, t);
} finally {
// only close the parser when its not provided externally
if (internalParser(source, parser)) {
parser.close();
}
}
boolean emptyDoc = false;
if (mapping.root.isEnabled()) {
token = parser.nextToken();
if (token == XContentParser.Token.END_OBJECT) {
// empty doc, we can handle it...
emptyDoc = true;
} else if (token != XContentParser.Token.FIELD_NAME) {
throw new MapperParsingException("Malformed content, after first object, either the type field or the actual properties should exist");
}
reverseOrder(context);
applyDocBoost(context);
ParsedDocument doc = parsedDocument(source, context, update(context, mapping));
// reset the context to free up memory
context.reset(null, null, null);
return doc;
}
private static void internalParseDocument(Mapping mapping, ParseContext.InternalParseContext context, XContentParser parser) throws IOException {
final boolean emptyDoc = isEmptyDoc(mapping, parser);
for (MetadataFieldMapper metadataMapper : mapping.metadataMappers) {
metadataMapper.preParse(context);
}
@ -129,39 +126,67 @@ class DocumentParser implements Closeable {
for (MetadataFieldMapper metadataMapper : mapping.metadataMappers) {
metadataMapper.postParse(context);
}
}
private void validateType(SourceToParse source) {
if (docMapper.type().equals(MapperService.DEFAULT_MAPPING)) {
throw new IllegalArgumentException("It is forbidden to index into the default mapping [" + MapperService.DEFAULT_MAPPING + "]");
}
if (source.type() != null && !source.type().equals(docMapper.type())) {
throw new MapperParsingException("Type mismatch, provide type [" + source.type() + "] but mapper is of type [" + docMapper.type() + "]");
}
}
private static XContentParser parser(SourceToParse source) throws IOException {
return source.parser() == null ? XContentHelper.createParser(source.source()) : source.parser();
}
private static boolean internalParser(SourceToParse source, XContentParser parser) {
return source.parser() == null && parser != null;
}
private static void validateStart(XContentParser parser) throws IOException {
// will result in START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new MapperParsingException("Malformed content, must start with an object");
}
}
private static void validateEnd(SourceToParse source, XContentParser parser) throws IOException {
XContentParser.Token token;// only check for end of tokens if we created the parser here
if (internalParser(source, parser)) {
// try to parse the next token, this should be null if the object is ended properly
// but will throw a JSON exception if the extra tokens is not valid JSON (this will be handled by the catch)
if (source.parser() == null && parser != null) {
// only check for end of tokens if we created the parser here
token = parser.nextToken();
if (token != null) {
throw new IllegalArgumentException("Malformed content, found extra data after parsing: " + token);
}
}
} catch (Throwable e) {
// if its already a mapper parsing exception, no need to wrap it...
if (e instanceof MapperParsingException) {
throw (MapperParsingException) e;
}
// Throw a more meaningful message if the document is empty.
if (source.source() != null && source.source().length() == 0) {
throw new MapperParsingException("failed to parse, document is empty");
private static boolean isEmptyDoc(Mapping mapping, XContentParser parser) throws IOException {
if (mapping.root.isEnabled()) {
final XContentParser.Token token = parser.nextToken();
if (token == XContentParser.Token.END_OBJECT) {
// empty doc, we can handle it...
return true;
} else if (token != XContentParser.Token.FIELD_NAME) {
throw new MapperParsingException("Malformed content, after first object, either the type field or the actual properties should exist");
}
}
return false;
}
throw new MapperParsingException("failed to parse", e);
} finally {
// only close the parser when its not provided externally
if (source.parser() == null && parser != null) {
parser.close();
}
}
private static void reverseOrder(ParseContext.InternalParseContext context) {
// reverse the order of docs for nested docs support, parent should be last
if (context.docs().size() > 1) {
Collections.reverse(context.docs());
}
}
private static void applyDocBoost(ParseContext.InternalParseContext context) {
// apply doc boost
if (context.docBoost() != 1.0f) {
Set<String> encounteredFields = new HashSet<>();
@ -177,18 +202,41 @@ class DocumentParser implements Closeable {
}
}
}
Mapper rootDynamicUpdate = context.dynamicMappingsUpdate();
Mapping update = null;
if (rootDynamicUpdate != null) {
update = mapping.mappingUpdate(rootDynamicUpdate);
}
ParsedDocument doc = new ParsedDocument(context.uid(), context.version(), context.id(), context.type(), source.routing(), source.timestamp(), source.ttl(), context.docs(),
context.source(), update).parent(source.parent());
// reset the context to free up memory
context.reset(null, null, null);
return doc;
private static ParsedDocument parsedDocument(SourceToParse source, ParseContext.InternalParseContext context, Mapping update) {
return new ParsedDocument(
context.uid(),
context.version(),
context.id(),
context.type(),
source.routing(),
source.timestamp(),
source.ttl(),
context.docs(),
context.source(),
update
).parent(source.parent());
}
private static Mapping update(ParseContext.InternalParseContext context, Mapping mapping) {
Mapper rootDynamicUpdate = context.dynamicMappingsUpdate();
return rootDynamicUpdate != null ? mapping.mappingUpdate(rootDynamicUpdate) : null;
}
private static MapperParsingException wrapInMapperParsingException(SourceToParse source, Throwable e) {
// if its already a mapper parsing exception, no need to wrap it...
if (e instanceof MapperParsingException) {
return (MapperParsingException) e;
}
// Throw a more meaningful message if the document is empty.
if (source.source() != null && source.source().length() == 0) {
return new MapperParsingException("failed to parse, document is empty");
}
return new MapperParsingException("failed to parse", e);
}
static ObjectMapper parseObject(ParseContext context, ObjectMapper mapper, boolean atRoot) throws IOException {
@ -214,22 +262,7 @@ class DocumentParser implements Closeable {
ObjectMapper.Nested nested = mapper.nested();
if (nested.isNested()) {
context = context.createNestedContext(mapper.fullPath());
ParseContext.Document nestedDoc = context.doc();
ParseContext.Document parentDoc = nestedDoc.getParent();
// pre add the uid field if possible (id was already provided)
IndexableField uidField = parentDoc.getField(UidFieldMapper.NAME);
if (uidField != null) {
// we don't need to add it as a full uid field in nested docs, since we don't need versioning
// we also rely on this for UidField#loadVersion
// this is a deeply nested field
nestedDoc.add(new Field(UidFieldMapper.NAME, uidField.stringValue(), UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
}
// the type of the nested doc starts with __, so we can identify that its a nested one in filters
// note, we don't prefix it with the type of the doc since it allows us to execute a nested query
// across types (for example, with similar nested objects)
nestedDoc.add(new Field(TypeFieldMapper.NAME, mapper.nestedTypePathAsString(), TypeFieldMapper.Defaults.FIELD_TYPE));
context = nestedContext(context, mapper);
}
// if we are at the end of the previous object, advance
@ -242,6 +275,15 @@ class DocumentParser implements Closeable {
}
ObjectMapper update = null;
update = innerParseObject(context, mapper, parser, currentFieldName, token, update);
// restore the enable path flag
if (nested.isNested()) {
nested(context, nested);
}
return update;
}
private static ObjectMapper innerParseObject(ParseContext context, ObjectMapper mapper, XContentParser parser, String currentFieldName, XContentParser.Token token, ObjectMapper update) throws IOException {
while (token != XContentParser.Token.END_OBJECT) {
ObjectMapper newUpdate = null;
if (token == XContentParser.Token.START_OBJECT) {
@ -266,34 +308,50 @@ class DocumentParser implements Closeable {
}
}
}
// restore the enable path flag
if (nested.isNested()) {
return update;
}
private static void nested(ParseContext context, ObjectMapper.Nested nested) {
ParseContext.Document nestedDoc = context.doc();
ParseContext.Document parentDoc = nestedDoc.getParent();
if (nested.isIncludeInParent()) {
for (IndexableField field : nestedDoc.getFields()) {
if (field.name().equals(UidFieldMapper.NAME) || field.name().equals(TypeFieldMapper.NAME)) {
continue;
} else {
parentDoc.add(field);
}
}
addFields(nestedDoc, parentDoc);
}
if (nested.isIncludeInRoot()) {
ParseContext.Document rootDoc = context.rootDoc();
// don't add it twice, if its included in parent, and we are handling the master doc...
if (!nested.isIncludeInParent() || parentDoc != rootDoc) {
addFields(nestedDoc, rootDoc);
}
}
}
private static void addFields(ParseContext.Document nestedDoc, ParseContext.Document rootDoc) {
for (IndexableField field : nestedDoc.getFields()) {
if (field.name().equals(UidFieldMapper.NAME) || field.name().equals(TypeFieldMapper.NAME)) {
continue;
} else {
if (!field.name().equals(UidFieldMapper.NAME) && !field.name().equals(TypeFieldMapper.NAME)) {
rootDoc.add(field);
}
}
}
private static ParseContext nestedContext(ParseContext context, ObjectMapper mapper) {
context = context.createNestedContext(mapper.fullPath());
ParseContext.Document nestedDoc = context.doc();
ParseContext.Document parentDoc = nestedDoc.getParent();
// pre add the uid field if possible (id was already provided)
IndexableField uidField = parentDoc.getField(UidFieldMapper.NAME);
if (uidField != null) {
// we don't need to add it as a full uid field in nested docs, since we don't need versioning
// we also rely on this for UidField#loadVersion
// this is a deeply nested field
nestedDoc.add(new Field(UidFieldMapper.NAME, uidField.stringValue(), UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
}
}
return update;
// the type of the nested doc starts with __, so we can identify that its a nested one in filters
// note, we don't prefix it with the type of the doc since it allows us to execute a nested query
// across types (for example, with similar nested objects)
nestedDoc.add(new Field(TypeFieldMapper.NAME, mapper.nestedTypePathAsString(), TypeFieldMapper.Defaults.FIELD_TYPE));
return context;
}
private static Mapper parseObjectOrField(ParseContext context, Mapper mapper) throws IOException {

View File

@ -117,9 +117,12 @@ public class JvmInfo implements Streamable, ToXContent {
Object useCompressedOopsVmOption = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseCompressedOops");
Method valueMethod = vmOptionClazz.getMethod("getValue");
info.useCompressedOops = (String)valueMethod.invoke(useCompressedOopsVmOption);
Object useG1GCVmOption = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseG1GC");
info.useG1GC = (String)valueMethod.invoke(useG1GCVmOption);
} catch (Throwable t) {
// unable to deduce the state of compressed oops
info.useCompressedOops = "unknown";
info.useG1GC = "unknown";
}
INSTANCE = info;
@ -158,6 +161,8 @@ public class JvmInfo implements Streamable, ToXContent {
private String useCompressedOops;
private String useG1GC;
private JvmInfo() {
}
@ -293,6 +298,10 @@ public class JvmInfo implements Streamable, ToXContent {
return this.useCompressedOops;
}
public String useG1GC() {
return this.useG1GC;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.JVM);

View File

@ -43,6 +43,9 @@ import org.elasticsearch.rest.action.support.RestActionListener;
import org.elasticsearch.rest.action.support.RestResponseListener;
import org.elasticsearch.rest.action.support.RestTable;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestNodeAttrsAction extends AbstractCatAction {
@ -112,6 +115,19 @@ public class RestNodeAttrsAction extends AbstractCatAction {
for (DiscoveryNode node : nodes) {
NodeInfo info = nodesInfo.getNodesMap().get(node.id());
for(ObjectObjectCursor<String, String> att : node.attributes()) {
buildRow(fullId, table, node, info, att.key, att.value);
}
if (info.getServiceAttributes() != null) {
for (Map.Entry<String, String> entry : info.getServiceAttributes().entrySet()) {
buildRow(fullId, table, node, info, entry.getKey(), entry.getValue());
}
}
}
return table;
}
private final void buildRow(boolean fullId, Table table, DiscoveryNode node, NodeInfo info, String key, String value) {
table.startRow();
table.addCell(node.name());
table.addCell(fullId ? node.id() : Strings.substring(node.getId(), 0, 4));
@ -123,12 +139,8 @@ public class RestNodeAttrsAction extends AbstractCatAction {
} else {
table.addCell("-");
}
table.addCell(att.key);
table.addCell(att.value);
table.addCell(key);
table.addCell(value);
table.endRow();
}
}
return table;
}
}

View File

@ -64,6 +64,7 @@ import org.elasticsearch.script.ScriptStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.rest.RestRequest.Method.GET;
@ -117,6 +118,7 @@ public class RestNodesAction extends AbstractCatAction {
table.addCell("pid", "default:false;alias:p;desc:process id");
table.addCell("ip", "alias:i;desc:ip address");
table.addCell("port", "default:false;alias:po;desc:bound transport port");
table.addCell("http_address", "default:false;alias:http;desc:bound http adress");
table.addCell("version", "default:false;alias:v;desc:es version");
table.addCell("build", "default:false;alias:b;desc:es build hash");
@ -247,6 +249,12 @@ public class RestNodesAction extends AbstractCatAction {
} else {
table.addCell("-");
}
final Map<String, String> serviceAttributes = info.getServiceAttributes();
if (serviceAttributes != null) {
table.addCell(serviceAttributes.getOrDefault("http_address", "-"));
} else {
table.addCell("-");
}
table.addCell(node.getVersion().toString());
table.addCell(info == null ? null : info.getBuild().shortHash());

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -142,7 +143,7 @@ public class RestRecoveryAction extends AbstractCatAction {
t.startRow();
t.addCell(index);
t.addCell(state.getShardId().id());
t.addCell(state.getTimer().time());
t.addCell(new TimeValue(state.getTimer().time()));
t.addCell(state.getType().toString().toLowerCase(Locale.ROOT));
t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT));
t.addCell(state.getSourceNode() == null ? "n/a" : state.getSourceNode().getHostName());

View File

@ -142,13 +142,22 @@ public class RestUtils {
* @throws IllegalArgumentException if the string contains a malformed
* escape sequence.
*/
@SuppressWarnings("fallthrough")
public static String decodeComponent(final String s, final Charset charset) {
if (s == null) {
return "";
}
final int size = s.length();
boolean modified = false;
if (!decodingNeeded(s, size)) {
return s;
}
final byte[] buf = new byte[size];
int pos = decode(s, size, buf);
return new String(buf, 0, pos, charset);
}
@SuppressWarnings("fallthrough")
private static boolean decodingNeeded(String s, int size) {
boolean decodingNeeded = false;
for (int i = 0; i < size; i++) {
final char c = s.charAt(i);
switch (c) {
@ -156,14 +165,15 @@ public class RestUtils {
i++; // We can skip at least one char, e.g. `%%'.
// Fall through.
case '+':
modified = true;
decodingNeeded = true;
break;
}
}
if (!modified) {
return s;
return decodingNeeded;
}
final byte[] buf = new byte[size];
@SuppressWarnings("fallthrough")
private static int decode(String s, int size, byte[] buf) {
int pos = 0; // position in `buf'.
for (int i = 0; i < size; i++) {
char c = s.charAt(i);
@ -173,16 +183,14 @@ public class RestUtils {
break;
case '%':
if (i == size - 1) {
throw new IllegalArgumentException("unterminated escape"
+ " sequence at end of string: " + s);
throw new IllegalArgumentException("unterminated escape sequence at end of string: " + s);
}
c = s.charAt(++i);
if (c == '%') {
buf[pos++] = '%'; // "%%" -> "%"
break;
} else if (i == size - 1) {
throw new IllegalArgumentException("partial escape"
+ " sequence at end of string: " + s);
throw new IllegalArgumentException("partial escape sequence at end of string: " + s);
}
c = decodeHexNibble(c);
final char c2 = decodeHexNibble(s.charAt(++i));
@ -199,7 +207,7 @@ public class RestUtils {
break;
}
}
return new String(buf, 0, pos, charset);
return pos;
}
/**

View File

@ -71,6 +71,28 @@ public class IndexRequestTests extends ESTestCase {
assertThat(request.validate().validationErrors(), not(empty()));
}
public void testIndexingRejectsLongIds() {
String id = randomAsciiOfLength(511);
IndexRequest request = new IndexRequest("index", "type", id);
request.source("{}");
ActionRequestValidationException validate = request.validate();
assertNull(validate);
id = randomAsciiOfLength(512);
request = new IndexRequest("index", "type", id);
request.source("{}");
validate = request.validate();
assertNull(validate);
id = randomAsciiOfLength(513);
request = new IndexRequest("index", "type", id);
request.source("{}");
validate = request.validate();
assertThat(validate, notNullValue());
assertThat(validate.getMessage(),
containsString("id is too long, must be no longer than 512 bytes but was: 513"));
}
public void testSetTTLAsTimeValue() {
IndexRequest indexRequest = new IndexRequest();
TimeValue ttl = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl");

View File

@ -20,14 +20,17 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -39,6 +42,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -337,6 +341,37 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
assertThat(result.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
}
public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
final DiscoveryNode newNode = new DiscoveryNode("newNode", DummyTransportAddress.INSTANCE, Version.CURRENT);
final DiscoveryNode oldNode1 = new DiscoveryNode("oldNode1", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
final DiscoveryNode oldNode2 = new DiscoveryNode("oldNode2", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
int numberOfShards = randomIntBetween(1, 3);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numberOfShards).numberOfReplicas
(randomIntBetween(0, 3)))
.build();
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"),
Version.CURRENT, "test")).build())
.nodes(DiscoveryNodes.builder().put(newNode).put(oldNode1).put(oldNode2)).build();
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{
new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
new NodeVersionAllocationDecider(Settings.EMPTY)});
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
allocationDeciders,
new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
// Make sure that primary shards are only allocated on the new node
for (int i = 0; i < numberOfShards; i++) {
assertEquals("newNode", result.routingTable().index("test").getShards().get(i).primaryShard().currentNodeId());
}
}
private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint());

View File

@ -1,481 +0,0 @@
#!/usr/bin/env ruby
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License
#
# NAME
# build_randomization.rb -- Generate property file for the JDK randomization test
#
# SYNOPSIS
# build_randomization.rb [-d] [-l|t]
#
# DESCRIPTION
# This script takes the randomization choices described in RANDOM_CHOICE and generates appropriate JAVA property file 'prop.txt'
# This property file also contain the appropriate JDK selection, randomized. JDK randomization is based on what is available on the Jenkins tools
# directory. This script is used by Jenkins test system to conduct Elasticsearch server randomization testing.
#
# In hash RANDOM_CHOICES, the key of randomization hash maps to key of java property. The value of the hash describes the possible value of the randomization
#
# For example RANDOM_CHOICES = { 'es.node.mode' => {:choices => ['local', 'network'], :method => :get_random_one} } means
# es.node.mode will be set to either 'local' or 'network', each with 50% of probability
#
# OPTIONS SUMMARY
# The options are as follows:
#
# -d, --debug Increase logging verbosity for debugging purpose
# -t, --test Run in test mode. The script will execute unit tests.
# -l, --local Run in local mode. In this mode, directory structure will be created under current directory to mimic
# Jenkins' server directory layout. This mode is mainly used for development.
require 'enumerator'
require 'getoptlong'
require 'log4r'
require 'optparse'
require 'rubygems'
require 'yaml'
include Log4r
RANDOM_CHOICES = {
'tests.jvm.argline' => [
{:choices => ['-server'], :method => 'get_random_one'},
{:choices => ['-XX:+UseConcMarkSweepGC', '-XX:+UseParallelGC', '-XX:+UseSerialGC', '-XX:+UseG1GC'], :method => 'get_random_one'},
{:choices => ['-XX:+UseCompressedOops', '-XX:-UseCompressedOops'], :method => 'get_random_one'},
{:choices => ['-XX:+AggressiveOpts'], :method => 'get_50_percent'}
],
'es.node.mode' => {:choices => ['local', 'network'], :method => 'get_random_one'},
# bug forced to be false for now :test_nightly => { :method => :true_or_false},
'tests.nightly' => {:selections => false},
'tests.heap.size' => {:choices => [512, 1024], :method => :random_heap},
'tests.assertion.disabled'=> {:choices => 'org.elasticsearch', :method => 'get_10_percent'},
'tests.security.manager' => {:choices => [true, false], :method => 'get_90_percent'},
}
L = Logger.new 'test_randomizer'
L.outputters = Outputter.stdout
L.level = INFO
C = {:local => false, :test => false}
OptionParser.new do |opts|
opts.banner = "Usage: build_randomization.rb [options]"
opts.on("-d", "--debug", "Debug mode") do |d|
L.level = DEBUG
end
opts.on("-l", "--local", "Run in local mode") do |l|
C[:local] = true
end
opts.on("-t", "--test", "Run unit tests") do |t|
C[:test] = true
end
end.parse!
class Randomizer
attr_accessor :data_array
def initialize(data_array)
@data_array = data_array
end
def true_or_false
[true, false][rand(2)]
end
def random_heap
inner_data_array = [data_array[0], data_array[1], data_array[0] + rand(data_array[1] - data_array[0])]
"%sm" % inner_data_array[rand(inner_data_array.size)]
end
def get_random_with_distribution(mdata_array, distribution)
L.debug "randomized distribution data %s" % YAML.dump(mdata_array)
L.debug "randomized distribution distribution %s" % YAML.dump(distribution)
carry = 0
distribution_map = distribution.enum_for(:each_with_index).map { |x,i| pre_carry = carry ; carry += x; {i => x + pre_carry} }
random_size = distribution_map.last.values.first
selection = rand(random_size)
#get the index that randomize choice mapped to
choice = distribution_map.select do |x|
x.values.first > selection #only keep the index with distribution value that is higher than the random generated number
end.first.keys.first #first hash's first key is the index we want
L.debug("randomized distribution choice %s" % mdata_array[choice])
mdata_array[choice]
end
def get_random_one
data_array[rand(data_array.size)]
end
def method_missing(meth, *args, &block)
# trap randomization based on percentage
if meth.to_s =~ /^get_(\d+)_percent/
percentage = $1.to_i
remain = 100 - percentage
#data = args.first
normalized_data = if(!data_array.kind_of?(Array))
[data_array, nil]
else
data_array
end
get_random_with_distribution(normalized_data, [percentage, remain])
else
super
end
end
end
class JDKSelector
attr_reader :directory, :jdk_list
def initialize(directory)
@directory = directory
end
# get selection of available JDKs from Jenkins automatic install directory
def get_jdk
@jdk_list = Dir.entries(directory).select do |x|
x.chars.first == 'J'
end.map do |y|
File.join(directory, y)
end
self
end
def filter_java_6(files)
files.select{ |i| File.basename(i).split(/[^0-9]/)[-1].to_i > 6 }
end
# do randomized selection from a given array
def select_one(selection_array = nil)
selection_array = filter_java_6(selection_array || @jdk_list)
Randomizer.new(selection_array).get_random_one
end
def JDKSelector.generate_jdk_hash(jdk_choice)
file_separator = if Gem.win_platform?
File::ALT_SEPARATOR
else
File::SEPARATOR
end
{
:PATH => [jdk_choice, 'bin'].join(file_separator) + File::PATH_SEPARATOR + ENV['PATH'],
:JAVA_HOME => jdk_choice
}
end
end
#
# Fix argument JDK selector
#
class FixedJDKSelector < JDKSelector
def initialize(directory)
@directory = [*directory] #selection of directories to pick from
end
def get_jdk
#since JDK selection is already specified..jdk list is the @directory
@jdk_list = @directory
self
end
def select_one(selection_array = nil)
#bypass filtering since this is not automatic
selection_array ||= @jdk_list
Randomizer.new(selection_array).get_random_one
end
end
#
# Property file writer
#
class PropertyWriter
attr_reader :working_directory
def initialize(mworking_directory)
@working_directory = mworking_directory
end
# # pick first element out of array of hashes, generate write java property file
def generate_property_file(data)
directory = working_directory
#array transformation
content = data.to_a.map do |x|
x.join('=')
end.sort
file_name = (ENV['BUILD_ID'] + ENV['BUILD_NUMBER']) || 'prop' rescue 'prop'
file_name = file_name.split(File::SEPARATOR).first + '.txt'
L.debug "Property file name is %s" % file_name
File.open(File.join(directory, file_name), 'w') do |file|
file.write(content.join("\n"))
end
end
end
#
# Execute randomization logics
#
class RandomizedRunner
attr_reader :random_choices, :jdk, :p_writer
def initialize(mrandom_choices, mjdk, mwriter)
@random_choices = mrandom_choices
@jdk = mjdk
@p_writer = mwriter
end
def generate_selections
configuration = random_choices
L.debug "Enter %s" % __method__
L.debug "Configuration %s" % YAML.dump(configuration)
generated = {}
configuration.each do |k, v|
if(v.kind_of?(Hash))
if(v.has_key?(:method))
randomizer = Randomizer.new(v[:choices])
v[:selections] = randomizer.__send__(v[:method])
end
else
v.each do |x|
if(x.has_key?(:method))
randomizer = Randomizer.new(x[:choices])
x[:selections] = randomizer.__send__(x[:method])
end
end
end
end.each do |k, v|
if(v.kind_of?(Array))
selections = v.inject([]) do |sum, current_hash|
sum.push(current_hash[:selections])
end
else
selections = [v[:selections]] unless v[:selections].nil?
end
generated[k] = selections unless (selections.nil? || selections.size == 0)
end
L.debug "Generated selections %s" % YAML.dump(generated)
generated
end
def get_env_matrix(jdk_selection, selections)
L.debug "Enter %s" % __method__
#normalization
s = {}
selections.each do |k, v|
if(v.size > 1)
s[k] = v.compact.join(' ') #this should be dependent on class of v[0] and perform reduce operation instead... good enough for now
else
s[k] = v.first
end
end
j = JDKSelector.generate_jdk_hash(jdk_selection)
# create build description line
desc = {}
# TODO: better error handling
desc[:BUILD_DESC] = "%s,%s,heap[%s],%s%s%s%s" % [
File.basename(j[:JAVA_HOME]),
s['es.node.mode'],
s['tests.heap.size'],
s['tests.nightly'] ? 'nightly,':'',
s['tests.jvm.argline'].gsub(/-XX:/,''),
s.has_key?('tests.assertion.disabled')? ',assert off' : '',
s['tests.security.manager'] ? ',sec manager on' : ''
]
result = j.merge(s).merge(desc)
L.debug(YAML.dump(result))
result
end
def run!
p_writer.generate_property_file(get_env_matrix(jdk, generate_selections))
end
end
#
# Main
#
unless(C[:test])
# Check to see if this is running locally
unless(C[:local])
L.debug("Normal Mode")
working_directory = ENV.fetch('WORKSPACE', (Gem.win_platform? ? Dir.pwd : '/var/tmp'))
else
L.debug("Local Mode")
test_directory = 'tools/hudson.model.JDK/'
unless(File.exist?(test_directory))
L.info "running local mode, setting up running environment"
L.info "properties are written to file prop.txt"
FileUtils.mkpath "%sJDK6" % test_directory
FileUtils.mkpath "%sJDK7" % test_directory
end
working_directory = Dir.pwd
end
# script support both window and linux
# TODO: refactor into platform/machine dependent class structure
jdk = if(Gem.win_platform?)
#window mode jdk directories are fixed
#TODO: better logic
L.debug("Window Mode")
if(File.directory?('y:\jdk7\7u55')) #old window system under ec2
FixedJDKSelector.new('y:\jdk7\7u55')
else #new metal window system
FixedJDKSelector.new(['c:\PROGRA~1\JAVA\jdk1.8.0_05', 'c:\PROGRA~1\JAVA\jdk1.7.0_55'])
end
else
#Jenkins sets pwd prior to execution
L.debug("Linux Mode")
JDKSelector.new(File.join(ENV['PWD'],'tools','hudson.model.JDK'))
end
runner = RandomizedRunner.new(RANDOM_CHOICES,
jdk.get_jdk.select_one,
PropertyWriter.new(working_directory))
environment_matrix = runner.run!
exit 0
else
require "test/unit"
end
#
# Test
#
class TestJDKSelector < Test::Unit::TestCase
L = Logger.new 'test'
L.outputters = Outputter.stdout
L.level = DEBUG
def test_hash_generator
jdk_choice = '/dummy/jdk7'
generated = JDKSelector.generate_jdk_hash(jdk_choice)
L.debug "Generated %s" % generated
assert generated[:PATH].include?(jdk_choice), "PATH doesn't included choice"
assert generated[:JAVA_HOME].include?(jdk_choice), "JAVA home doesn't include choice"
end
end
class TestFixJDKSelector < Test::Unit::TestCase
L = Logger.new 'test'
L.outputters = Outputter.stdout
L.level = DEBUG
def test_initialize
['/home/dummy', ['/JDK7', '/home2'], ['home/dummy']].each do |x|
test_object = FixedJDKSelector.new(x)
assert_kind_of Array, test_object.directory
assert_equal [*x], test_object.directory
end
end
def test_select_one
test_array = %w(one two three)
test_object = FixedJDKSelector.new(test_array)
assert test_array.include?(test_object.get_jdk.select_one)
end
def test_hash_generator
jdk_choice = '/dummy/jdk7'
generated = FixedJDKSelector.generate_jdk_hash(jdk_choice)
L.debug "Generated %s" % generated
assert generated[:PATH].include?(jdk_choice), "PATH doesn't included choice"
assert generated[:JAVA_HOME].include?(jdk_choice), "JAVA home doesn't include choice"
end
end
class TestPropertyWriter < Test::Unit::TestCase
L = Logger.new 'test'
L.outputters = Outputter.stdout
L.level = DEBUG
def test_initialize
['/home/dummy','/tmp'].each do |x|
test_object = PropertyWriter.new(x)
assert_kind_of String, test_object.working_directory
assert_equal x, test_object.working_directory
end
end
def test_generate_property
test_file = '/tmp/prop.txt'
File.delete(test_file) if File.exist?(test_file)
test_object = PropertyWriter.new(File.dirname(test_file))
# default prop.txt
test_object.generate_property_file({:hi => 'there'})
assert(File.exist?(test_file))
File.open(test_file, 'r') do |properties_file|
properties_file.read.each_line do |line|
line.strip!
assert_equal 'hi=there', line, "content %s is not hi=there" % line
end
end
File.delete(test_file) if File.exist?(test_file)
end
end
class DummyPropertyWriter < PropertyWriter
def generate_property_file(data)
L.debug "generating property file for %s" % YAML.dump(data)
L.debug "on directory %s" % working_directory
end
end
class TestRandomizedRunner < Test::Unit::TestCase
def test_initialize
test_object = RandomizedRunner.new(RANDOM_CHOICES, '/tmp/dummy/jdk', po = PropertyWriter.new('/tmp'))
assert_equal RANDOM_CHOICES, test_object.random_choices
assert_equal '/tmp/dummy/jdk', test_object.jdk
assert_equal po, test_object.p_writer
end
def test_generate_selection_no_method
test_object = RandomizedRunner.new({'tests.one' => {:selections => false }}, '/tmp/dummy/jdk', po = DummyPropertyWriter.new('/tmp'))
selection = test_object.generate_selections
assert_equal false, selection['tests.one'].first, 'randomization without selection method fails'
end
def test_generate_with_method
test_object = RandomizedRunner.new({'es.node.mode' => {:choices => ['local', 'network'], :method => 'get_random_one'}},
'/tmp/dummy/jdk', po = DummyPropertyWriter.new('/tmp'))
selection = test_object.generate_selections
assert ['local', 'network'].include?(selection['es.node.mode'].first), 'selection choice is not correct'
end
def test_get_env_matrix
test_object = RandomizedRunner.new(RANDOM_CHOICES,
'/tmp/dummy/jdk', po = DummyPropertyWriter.new('/tmp'))
selection = test_object.generate_selections
env_matrix = test_object.get_env_matrix('/tmp/dummy/jdk', selection)
puts YAML.dump(env_matrix)
assert_equal '/tmp/dummy/jdk', env_matrix[:JAVA_HOME]
end
end

View File

@ -1,24 +0,0 @@
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
#
# This is used for client testings to pull in master, 090 bits
#
URL_MASTER=http://s3-us-west-2.amazonaws.com/build.elasticsearch.org/origin/master/nightly/JDK7/elasticsearch-latest-SNAPSHOT.zip
URL_1x=http://s3-us-west-2.amazonaws.com/build.elasticsearch.org/origin/1.x/nightly/JDK7/elasticsearch-latest-SNAPSHOT.zip
URL_11=http://s3-us-west-2.amazonaws.com/build.elasticsearch.org/origin/1.1/nightly/JDK6/elasticsearch-latest-SNAPSHOT.zip
URL_10=http://s3-us-west-2.amazonaws.com/build.elasticsearch.org/origin/1.0/nightly/JDK6/elasticsearch-latest-SNAPSHOT.zip
URL_090=http://s3-us-west-2.amazonaws.com/build.elasticsearch.org/origin/0.90/nightly/JDK6/elasticsearch-latest-SNAPSHOT.zip

View File

@ -1,65 +0,0 @@
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import sys
import argparse
try:
import boto.s3
except:
raise RuntimeError("""
S3 download requires boto to be installed
Use one of:
'pip install -U boto'
'apt-get install python-boto'
'easy_install boto'
""")
import boto.s3
def list_buckets(conn):
return conn.get_all_buckets()
def download_s3(conn, path, key, file, bucket):
print 'Downloading %s from Amazon S3 bucket %s/%s' % \
(file, bucket, os.path.join(path, key))
def percent_cb(complete, total):
sys.stdout.write('.')
sys.stdout.flush()
bucket = conn.get_bucket(bucket)
k = bucket.get_key(os.path.join(path, key))
k.get_contents_to_filename(file, cb=percent_cb, num_cb=100)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Downloads a bucket from Amazon S3')
parser.add_argument('--file', '-f', metavar='path to file',
help='path to store the bucket to', required=True)
parser.add_argument('--bucket', '-b', default='downloads.elasticsearch.org',
help='The S3 Bucket to download from')
parser.add_argument('--path', '-p', default='',
help='The key path to use')
parser.add_argument('--key', '-k', default=None,
help='The key - uses the file name as default key')
args = parser.parse_args()
if args.key:
key = args.key
else:
key = os.path.basename(args.file)
connection = boto.connect_s3()
download_s3(connection, args.path, key, args.file, args.bucket);

View File

@ -1,319 +0,0 @@
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import random
import os
import tempfile
import shutil
import subprocess
import time
import argparse
import logging
import sys
import re
from datetime import datetime
try:
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError
from elasticsearch.exceptions import TransportError
except ImportError as e:
print('Can\'t import elasticsearch please install `sudo pip install elasticsearch`')
raise e
'''This file executes a basic upgrade test by running a full cluster restart.
The upgrade test starts 2 or more nodes of an old elasticsearch version, indexes
a random number of documents into the running nodes and executes a full cluster restart.
After the nodes are recovered a small set of basic checks are executed to ensure all
documents are still searchable and field data can be loaded etc.
NOTE: This script requires the elasticsearch python client `elasticsearch-py` run the following command to install:
`sudo pip install elasticsearch`
if you are running python3 you need to install the client using pip3. On OSX `pip3` will be included in the Python 3.4
release available on `https://www.python.org/download/`:
`sudo pip3 install elasticsearch`
See `https://github.com/elasticsearch/elasticsearch-py` for details
In order to run this test two different version of elasticsearch are required. Both need to be unpacked into
the same directory:
```
$ cd /path/to/elasticsearch/clone
$ mkdir backwards && cd backwards
$ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.1.tar.gz
$ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-0.90.13.tar.gz
$ tar -zxvf elasticsearch-1.3.1.tar.gz && tar -zxvf elasticsearch-0.90.13.tar.gz
$ cd ..
$ python dev-tools/upgrade-tests.py --version.backwards 0.90.13 --version.current 1.3.1
```
'''
BLACK_LIST = {'1.2.0' : { 'reason': 'Contains a major bug where routing hashes are not consistent with previous version',
'issue': 'https://github.com/elasticsearch/elasticsearch/pull/6393'},
'1.3.0' : { 'reason': 'Lucene Related bug prevents upgrades from 0.90.7 and some earlier versions ',
'issue' : 'https://github.com/elasticsearch/elasticsearch/pull/7055'}}
# sometimes returns True
def rarely():
return random.randint(0, 10) == 0
# usually returns True
def frequently():
return not rarely()
# asserts the correctness of the given hits given they are sorted asc
def assert_sort(hits):
values = [hit['sort'] for hit in hits['hits']['hits']]
assert len(values) > 0, 'expected non emtpy result'
val = min(values)
for x in values:
assert x >= val, '%s >= %s' % (x, val)
val = x
# asserts that the cluster health didn't timeout etc.
def assert_health(cluster_health, num_shards, num_replicas):
assert cluster_health['timed_out'] == False, 'cluster health timed out %s' % cluster_health
# Starts a new elasticsearch node from a released & untared version.
# This node uses unicast discovery with the provided unicast host list and starts
# the nodes with the given data directory. This allows shutting down and starting up
# nodes on the same data dir simulating a full cluster restart.
def start_node(version, data_dir, node_dir, unicast_host_list, tcp_port, http_port):
es_run_path = os.path.join(node_dir, 'elasticsearch-%s' % (version), 'bin/elasticsearch')
if version.startswith('0.90.'):
foreground = '-f' # 0.90.x starts in background automatically
else:
foreground = ''
return subprocess.Popen([es_run_path,
'-Des.path.data=%s' % data_dir, '-Des.cluster.name=upgrade_test',
'-Des.discovery.zen.ping.unicast.hosts=%s' % unicast_host_list,
'-Des.transport.tcp.port=%s' % tcp_port,
'-Des.http.port=%s' % http_port,
foreground], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Indexes the given number of document into the given index
# and randomly runs refresh, optimize and flush commands
def index_documents(es, index_name, type, num_docs):
logging.info('Indexing %s docs' % num_docs)
for id in range(0, num_docs):
es.index(index=index_name, doc_type=type, id=id, body={'string': str(random.randint(0, 100)),
'long_sort': random.randint(0, 100),
'double_sort' : float(random.randint(0, 100))})
if rarely():
es.indices.refresh(index=index_name)
if rarely():
es.indices.flush(index=index_name, force=frequently())
if rarely():
es.indices.optimize(index=index_name)
es.indices.refresh(index=index_name)
# Runs a basic number of assertions including:
# - document counts
# - match all search with sort on double / long
# - Realtime GET operations
# TODO(simonw): we should add stuff like:
# - dates including sorting
# - string sorting
# - docvalues if available
# - global ordinal if available
def run_basic_asserts(es, index_name, type, num_docs):
count = es.count(index=index_name)['count']
assert count == num_docs, 'Expected %r but got %r documents' % (num_docs, count)
for _ in range(0, num_docs):
random_doc_id = random.randint(0, num_docs-1)
doc = es.get(index=index_name, doc_type=type, id=random_doc_id)
assert doc, 'Expected document for id %s but got %s' % (random_doc_id, doc)
assert_sort(es.search(index=index_name,
body={
'sort': [
{'double_sort': {'order': 'asc'}}
]
}))
assert_sort(es.search(index=index_name,
body={
'sort': [
{'long_sort': {'order': 'asc'}}
]
}))
# picks a random version or and entire random version tuple from the directory
# to run the backwards tests against.
def pick_random_upgrade_version(directory, lower_version=None, upper_version=None):
if lower_version and upper_version:
return lower_version, upper_version
assert os.path.isdir(directory), 'No such directory %s' % directory
versions = []
for version in map(lambda x : x[len('elasticsearch-'):], filter(lambda x : re.match(r'^elasticsearch-\d+[.]\d+[.]\d+$', x), os.listdir(directory))):
if not version in BLACK_LIST:
versions.append(build_tuple(version))
versions.sort()
if lower_version: # lower version is set - picking a higher one
versions = filter(lambda x : x > build_tuple(lower_version), versions)
assert len(versions) >= 1, 'Expected at least 1 higher version than %s version in %s ' % (lower_version, directory)
random.shuffle(versions)
return lower_version, build_version(versions[0])
if upper_version:
versions = filter(lambda x : x < build_tuple(upper_version), versions)
assert len(versions) >= 1, 'Expected at least 1 lower version than %s version in %s ' % (upper_version, directory)
random.shuffle(versions)
return build_version(versions[0]), upper_version
assert len(versions) >= 2, 'Expected at least 2 different version in %s but found %s' % (directory, len(versions))
random.shuffle(versions)
versions = versions[0:2]
versions.sort()
return build_version(versions[0]), build_version(versions[1])
def build_version(version_tuple):
return '.'.join([str(x) for x in version_tuple])
def build_tuple(version_string):
return [int(x) for x in version_string.split('.')]
# returns a new elasticsearch client and ensures the all nodes have joined the cluster
# this method waits at most 30 seconds for all nodes to join
def new_es_instance(num_nodes, http_port, timeout = 30):
logging.info('Waiting for %s nodes to join the cluster' % num_nodes)
for _ in range(0, timeout):
# TODO(simonw): ask Honza if there is a better way to do this?
try:
es = Elasticsearch([
{'host': '127.0.0.1', 'port': http_port + x}
for x in range(0, num_nodes)])
es.cluster.health(wait_for_nodes=num_nodes)
es.count() # can we actually search or do we get a 503? -- anyway retry
return es
except (ConnectionError, TransportError):
pass
time.sleep(1)
assert False, 'Timed out waiting for %s nodes for %s seconds' % (num_nodes, timeout)
def assert_versions(bwc_version, current_version, node_dir):
assert [int(x) for x in bwc_version.split('.')] < [int(x) for x in current_version.split('.')],\
'[%s] must be < than [%s]' % (bwc_version, current_version)
for version in [bwc_version, current_version]:
assert not version in BLACK_LIST, 'Version %s is blacklisted - %s, see %s' \
% (version, BLACK_LIST[version]['reason'],
BLACK_LIST[version]['issue'])
dir = os.path.join(node_dir, 'elasticsearch-%s' % current_version)
assert os.path.isdir(dir), 'Expected elasticsearch-%s install directory does not exists: %s' % (version, dir)
def full_cluster_restart(node_dir, current_version, bwc_version, tcp_port, http_port):
assert_versions(bwc_version, current_version, node_dir)
num_nodes = random.randint(2, 3)
nodes = []
data_dir = tempfile.mkdtemp()
logging.info('Running upgrade test from [%s] to [%s] seed: [%s] es.path.data: [%s] es.http.port [%s] es.tcp.port [%s]'
% (bwc_version, current_version, seed, data_dir, http_port, tcp_port))
try:
logging.info('Starting %s BWC nodes of version %s' % (num_nodes, bwc_version))
unicast_addresses = ','.join(['127.0.0.1:%s' % (tcp_port+x) for x in range(0, num_nodes)])
for id in range(0, num_nodes):
nodes.append(start_node(bwc_version, data_dir, node_dir, unicast_addresses, tcp_port+id, http_port+id))
es = new_es_instance(num_nodes, http_port)
es.indices.delete(index='test_index', ignore=404)
num_shards = random.randint(1, 10)
num_replicas = random.randint(0, 1)
logging.info('Create index with [%s] shards and [%s] replicas' % (num_shards, num_replicas))
es.indices.create(index='test_index', body={
# TODO(simonw): can we do more here in terms of randomization - seems hard due to all the different version
'settings': {
'number_of_shards': num_shards,
'number_of_replicas': num_replicas
}
})
logging.info('Nodes joined, waiting for green status')
health = es.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
assert_health(health, num_shards, num_replicas)
num_docs = random.randint(10, 100)
index_documents(es, 'test_index', 'test_type', num_docs)
logging.info('Run basic asserts before full cluster restart')
run_basic_asserts(es, 'test_index', 'test_type', num_docs)
logging.info('kill bwc nodes -- prepare upgrade')
for node in nodes:
node.terminate()
# now upgrade the nodes and rerun the checks
tcp_port = tcp_port + len(nodes) # bump up port to make sure we can claim them
http_port = http_port + len(nodes)
logging.info('Full Cluster restart starts upgrading to version [elasticsearch-%s] es.http.port [%s] es.tcp.port [%s]'
% (current_version, http_port, tcp_port))
nodes = []
unicast_addresses = ','.join(['127.0.0.1:%s' % (tcp_port+x) for x in range(0, num_nodes)])
for id in range(0, num_nodes+1): # one more to trigger relocation
nodes.append(start_node(current_version, data_dir, node_dir, unicast_addresses, tcp_port+id, http_port+id))
es = new_es_instance(num_nodes+1, http_port)
logging.info('Nodes joined, waiting for green status')
health = es.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
assert_health(health, num_shards, num_replicas)
run_basic_asserts(es, 'test_index', 'test_type', num_docs)
# by running the indexing again we try to catch possible mapping problems after the upgrade
index_documents(es, 'test_index', 'test_type', num_docs)
run_basic_asserts(es, 'test_index', 'test_type', num_docs)
logging.info("[SUCCESS] - all test passed upgrading from version [%s] to version [%s]" % (bwc_version, current_version))
finally:
for node in nodes:
node.terminate()
time.sleep(1) # wait a second until removing the data dirs to give the nodes a chance to shutdown
shutil.rmtree(data_dir) # remove the temp data dir
if __name__ == '__main__':
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
datefmt='%Y-%m-%d %I:%M:%S %p')
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.WARN)
parser = argparse.ArgumentParser(description='Tests Full Cluster Restarts across major version')
parser.add_argument('--version.backwards', '-b', dest='backwards_version', metavar='V',
help='The elasticsearch version to upgrade from')
parser.add_argument('--version.current', '-c', dest='current_version', metavar='V',
help='The elasticsearch version to upgrade to')
parser.add_argument('--seed', '-s', dest='seed', metavar='N', type=int,
help='The random seed to use')
parser.add_argument('--backwards.dir', '-d', dest='bwc_directory', default='backwards', metavar='dir',
help='The directory to the backwards compatibility sources')
parser.add_argument('--tcp.port', '-p', dest='tcp_port', default=9300, metavar='port', type=int,
help='The port to use as the minimum port for TCP communication')
parser.add_argument('--http.port', '-t', dest='http_port', default=9200, metavar='port', type=int,
help='The port to use as the minimum port for HTTP communication')
parser.set_defaults(bwc_directory='backwards')
parser.set_defaults(seed=int(time.time()))
args = parser.parse_args()
node_dir = args.bwc_directory
current_version = args.current_version
bwc_version = args.backwards_version
seed = args.seed
random.seed(seed)
bwc_version, current_version = pick_random_upgrade_version(node_dir, bwc_version, current_version)
tcp_port = args.tcp_port
http_port = args.http_port
try:
full_cluster_restart(node_dir, current_version, bwc_version, tcp_port, http_port)
except:
logging.warn('REPRODUCE WITH: \n\t`python %s --version.backwards %s --version.current %s --seed %s --tcp.port %s --http.port %s`'
% (sys.argv[0], bwc_version, current_version, seed, tcp_port, http_port))
raise

View File

@ -1,67 +0,0 @@
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import sys
import argparse
try:
import boto.s3
except:
raise RuntimeError("""
S3 upload requires boto to be installed
Use one of:
'pip install -U boto'
'apt-get install python-boto'
'easy_install boto'
""")
import boto.s3
def list_buckets(conn):
return conn.get_all_buckets()
def upload_s3(conn, path, key, file, bucket):
print 'Uploading %s to Amazon S3 bucket %s/%s' % \
(file, bucket, os.path.join(path, key))
def percent_cb(complete, total):
sys.stdout.write('.')
sys.stdout.flush()
bucket = conn.create_bucket(bucket)
k = bucket.new_key(os.path.join(path, key))
k.set_contents_from_filename(file, cb=percent_cb, num_cb=100)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Uploads files to Amazon S3')
parser.add_argument('--file', '-f', metavar='path to file',
help='the branch to release from', required=True)
parser.add_argument('--bucket', '-b', metavar='B42', default='download.elasticsearch.org',
help='The S3 Bucket to upload to')
parser.add_argument('--path', '-p', metavar='elasticsearch/elasticsearch', default='elasticsearch/elasticsearch',
help='The key path to use')
parser.add_argument('--key', '-k', metavar='key', default=None,
help='The key - uses the file name as default key')
args = parser.parse_args()
if args.key:
key = args.key
else:
key = os.path.basename(args.file)
connection = boto.connect_s3()
upload_s3(connection, args.path, key, args.file, args.bucket);

View File

@ -102,6 +102,7 @@ descriptors percentage |1
|`file_desc.max` |`fdm`, `fileDescriptorMax` |No |Maximum number of file
descriptors |1024
|`load` |`l` |No |Most recent load average |0.22
|`cpu` | |No |Recent system CPU usage as percent |12
|`uptime` |`u` |No |Node uptime |17.3m
|`node.role` |`r`, `role`, `dc`, `nodeRole` |Yes |Data node (d); Client
node (c) |d

View File

@ -15,10 +15,12 @@ are no shards in transit from one node to another:
[source,sh]
----------------------------------------------------------------------------
> curl -XGET 'localhost:9200/_cat/recovery?v'
index shard time type stage source target files percent bytes percent
wiki 0 73 store done hostA hostA 36 100.0% 24982806 100.0%
wiki 1 245 store done hostA hostA 33 100.0% 24501912 100.0%
wiki 2 230 store done hostA hostA 36 100.0% 30267222 100.0%
index shard time type stage source_host target_host repository snapshot files files_percent bytes bytes_percent total_files total_bytes translog translog_percent total_translog
index 0 87ms store done 127.0.0.1 127.0.0.1 n/a n/a 0 0.0% 0 0.0% 0 0 0 100.0% 0
index 1 97ms store done 127.0.0.1 127.0.0.1 n/a n/a 0 0.0% 0 0.0% 0 0 0 100.0% 0
index 2 93ms store done 127.0.0.1 127.0.0.1 n/a n/a 0 0.0% 0 0.0% 0 0 0 100.0% 0
index 3 90ms store done 127.0.0.1 127.0.0.1 n/a n/a 0 0.0% 0 0.0% 0 0 0 100.0% 0
index 4 9ms store done 127.0.0.1 127.0.0.1 n/a n/a 0 0.0% 0 0.0% 0 0 0 100.0% 0
---------------------------------------------------------------------------
In the above case, the source and target nodes are the same because the recovery
@ -33,14 +35,14 @@ what a live shard recovery looks like.
> curl -XPUT 'localhost:9200/wiki/_settings' -d'{"number_of_replicas":1}'
{"acknowledged":true}
> curl -XGET 'localhost:9200/_cat/recovery?v'
index shard time type stage source target files percent bytes percent
wiki 0 1252 store done hostA hostA 4 100.0% 23638870 100.0%
wiki 0 1672 replica index hostA hostB 4 75.0% 23638870 48.8%
wiki 1 1698 replica index hostA hostB 4 75.0% 23348540 49.4%
wiki 1 4812 store done hostA hostA 33 100.0% 24501912 100.0%
wiki 2 1689 replica index hostA hostB 4 75.0% 28681851 40.2%
wiki 2 5317 store done hostA hostA 36 100.0% 30267222 100.0%
> curl -XGET 'localhost:9200/_cat/recovery?v&h=i,s,t,ty,st,shost,thost,f,fp,b,bp'
i s t ty st shost thost f fp b bp
wiki 0 1252ms store done hostA hostA 4 100.0% 23638870 100.0%
wiki 0 1672ms replica index hostA hostB 4 75.0% 23638870 48.8%
wiki 1 1698ms replica index hostA hostB 4 75.0% 23348540 49.4%
wiki 1 4812ms store done hostA hostA 33 100.0% 24501912 100.0%
wiki 2 1689ms replica index hostA hostB 4 75.0% 28681851 40.2%
wiki 2 5317ms store done hostA hostA 36 100.0% 30267222 100.0%
----------------------------------------------------------------------------
We can see in the above listing that our 3 initial shards are in various stages
@ -55,13 +57,13 @@ API.
--------------------------------------------------------------------------------
> curl -XPOST 'localhost:9200/_snapshot/imdb/snapshot_2/_restore'
{"acknowledged":true}
> curl -XGET 'localhost:9200/_cat/recovery?v'
index shard time type stage repository snapshot files percent bytes percent
imdb 0 1978 snapshot done imdb snap_1 79 8.0% 12086 9.0%
imdb 1 2790 snapshot index imdb snap_1 88 7.7% 11025 8.1%
imdb 2 2790 snapshot index imdb snap_1 85 0.0% 12072 0.0%
imdb 3 2796 snapshot index imdb snap_1 85 2.4% 12048 7.2%
imdb 4 819 snapshot init imdb snap_1 0 0.0% 0 0.0%
> curl -XGET 'localhost:9200/_cat/recovery?v&h=i,s,t,ty,st,rep,snap,f,fp,b,bp'
i s t ty st rep snap f fp b bp
imdb 0 1978ms snapshot done imdb snap_1 79 8.0% 12086 9.0%
imdb 1 2790ms snapshot index imdb snap_1 88 7.7% 11025 8.1%
imdb 2 2790ms snapshot index imdb snap_1 85 0.0% 12072 0.0%
imdb 3 2796ms snapshot index imdb snap_1 85 2.4% 12048 7.2%
imdb 4 819ms snapshot init imdb snap_1 0 0.0% 0 0.0%
--------------------------------------------------------------------------------

View File

@ -97,6 +97,11 @@ characteristics as the former `scan` search type.
[[breaking_30_rest_api_changes]]
=== REST API changes
==== id values longer than 512 bytes are rejected
When specifying an `_id` value longer than 512 bytes, the request will be
rejected.
==== search exists api removed
The search exists api has been removed in favour of using the search api with
@ -311,6 +316,13 @@ Elasticsearch process has been removed. This same information can be
obtained from the <<cluster-nodes-info>> API, and a warning is logged
on startup if it is set too low.
==== Removed es.netty.gathering
Disabling Netty from using NIO gathering could be done via the escape
hatch of setting the system property "es.netty.gathering" to "false".
Time has proven enabling gathering by default is a non-issue and this
non-documented setting has been removed.
[[breaking_30_mapping_changes]]
=== Mapping changes

View File

@ -48,3 +48,12 @@
$body: |
/^ file_desc\.current \s+ file_desc\.percent \s+ file_desc\.max \n
(\s+ (-1|\d+) \s+ \d+ \s+ (-1|\d+) \n)+ $/
- do:
cat.nodes:
h: http
v: true
- match:
$body: |
/^ http \n ((\d{1,3}\.){3}\d{1,3}:\d{1,5}\n)+ $/

View File

@ -27,7 +27,7 @@
(
index1 \s+
\d \s+ # shard
\d+ \s+ # time
\d+ms \s+ # time
(store|replica|snapshot|relocating) \s+ # type
(init|index|verify_index|translog|finalize|done) \s+ # stage
[-\w./]+ \s+ # source_host

View File

@ -24,3 +24,11 @@
- match: { _id: "1"}
- match: { _version: 1}
- match: { _source: { foo: bar }}
- do:
catch: request
index:
index: idx
type: type
id: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
body: { foo: bar }