Optimize the order of bytes in uuids for better compression. (#24615)

Flake ids organize bytes in such a way that ids are ordered. However, we do not
need that property and could reorganize bytes in an order that would better suit
Lucene's terms dict instead.

Some synthetic tests suggest that this change decreases the disk footprint of
the `_id` field by about 50% in many cases (see `UUIDTests.testCompression`).
For instance, when simulating the indexing of 10M docs at a rate of 10k docs
per second, the current uid generator used 20.2 bytes per document on average,
while this new generator which only puts bytes in a different order uses 9.6
bytes per document on average.

We had already explored this idea in #18209 but the attempt to share long common
prefixes had had a bad impact on indexing speed. This time I have been more
careful about putting discriminant bytes early in the `_id` in a way that
preserves indexing speed on par with today, while still allowing for better
compression.
This commit is contained in:
Adrien Grand 2017-07-11 17:28:23 +02:00 committed by GitHub
parent a3ade99fcf
commit f9fbce84b6
14 changed files with 235 additions and 34 deletions

View File

@ -25,6 +25,7 @@ import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
@ -289,13 +290,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
case CREATE:
case INDEX:
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
MappingMetaData mappingMd = null;
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
if (indexMetaData != null) {
mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
}
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
Version indexCreated = indexMetaData.getCreationVersion();
indexRequest.resolveRouting(metaData);
indexRequest.process(mappingMd, concreteIndex.getName());
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);

View File

@ -350,7 +350,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, concreteIndex);
indexRequest.process(metaData.getCreationVersion(), mappingMd, concreteIndex);
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
@ -484,7 +485,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
}
public void process(@Nullable MappingMetaData mappingMd, String concreteIndex) {
public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappingMd, String concreteIndex) {
if (mappingMd != null) {
// might as well check for routing here
if (mappingMd.routing().required() && routing == null) {
@ -508,7 +509,13 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
if (id == null) {
assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
id(UUIDs.base64UUID());
String uid;
if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) {
uid = UUIDs.base64UUID();
} else {
uid = UUIDs.legacyBase64UUID();
}
id(uid);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import java.util.Base64;
import java.util.concurrent.atomic.AtomicInteger;
/**
* These are essentially flake ids (http://boundary.com/blog/2012/01/12/flake-a-decentralized-k-ordered-unique-id-generator-in-erlang) but
* we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number.
*/
class LegacyTimeBasedUUIDGenerator implements UUIDGenerator {
// We only use bottom 3 bytes for the sequence number. Paranoia: init with random int so that if JVM/OS/machine goes down, clock slips
// backwards, and JVM comes back up, we are less likely to be on the same sequenceNumber at the same time:
private final AtomicInteger sequenceNumber = new AtomicInteger(SecureRandomHolder.INSTANCE.nextInt());
// Used to ensure clock moves forward:
private long lastTimestamp;
private static final byte[] SECURE_MUNGED_ADDRESS = MacAddressProvider.getSecureMungedAddress();
static {
assert SECURE_MUNGED_ADDRESS.length == 6;
}
/** Puts the lower numberOfLongBytes from l into the array, starting index pos. */
private static void putLong(byte[] array, long l, int pos, int numberOfLongBytes) {
for (int i=0; i<numberOfLongBytes; ++i) {
array[pos+numberOfLongBytes-i-1] = (byte) (l >>> (i*8));
}
}
@Override
public String getBase64UUID() {
final int sequenceId = sequenceNumber.incrementAndGet() & 0xffffff;
long timestamp = System.currentTimeMillis();
synchronized (this) {
// Don't let timestamp go backwards, at least "on our watch" (while this JVM is running). We are still vulnerable if we are
// shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of
// collision:
timestamp = Math.max(lastTimestamp, timestamp);
if (sequenceId == 0) {
// Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards:
timestamp++;
}
lastTimestamp = timestamp;
}
final byte[] uuidBytes = new byte[15];
// Only use lower 6 bytes of the timestamp (this will suffice beyond the year 10000):
putLong(uuidBytes, timestamp, 0, 6);
// MAC address adds 6 bytes:
System.arraycopy(SECURE_MUNGED_ADDRESS, 0, uuidBytes, 6, SECURE_MUNGED_ADDRESS.length);
// Sequence number adds 3 bytes:
putLong(uuidBytes, sequenceId, 12, 3);
assert 9 + SECURE_MUNGED_ADDRESS.length == uuidBytes.length;
return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes);
}
}

View File

@ -23,7 +23,8 @@ import java.util.Base64;
import java.util.concurrent.atomic.AtomicInteger;
/** These are essentially flake ids (http://boundary.com/blog/2012/01/12/flake-a-decentralized-k-ordered-unique-id-generator-in-erlang) but
* we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. */
* we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. We also reorder bytes in a way that does not make ids
* sort in order anymore, but is more friendly to the way that the Lucene terms dictionary is structured. */
class TimeBasedUUIDGenerator implements UUIDGenerator {
@ -40,17 +41,20 @@ class TimeBasedUUIDGenerator implements UUIDGenerator {
assert SECURE_MUNGED_ADDRESS.length == 6;
}
/** Puts the lower numberOfLongBytes from l into the array, starting index pos. */
private static void putLong(byte[] array, long l, int pos, int numberOfLongBytes) {
for (int i=0; i<numberOfLongBytes; ++i) {
array[pos+numberOfLongBytes-i-1] = (byte) (l >>> (i*8));
}
// protected for testing
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
// protected for testing
protected byte[] macAddress() {
return SECURE_MUNGED_ADDRESS;
}
@Override
public String getBase64UUID() {
final int sequenceId = sequenceNumber.incrementAndGet() & 0xffffff;
long timestamp = System.currentTimeMillis();
long timestamp = currentTimeMillis();
synchronized (this) {
// Don't let timestamp go backwards, at least "on our watch" (while this JVM is running). We are still vulnerable if we are
@ -67,17 +71,45 @@ class TimeBasedUUIDGenerator implements UUIDGenerator {
}
final byte[] uuidBytes = new byte[15];
int i = 0;
// Only use lower 6 bytes of the timestamp (this will suffice beyond the year 10000):
putLong(uuidBytes, timestamp, 0, 6);
// We have auto-generated ids, which are usually used for append-only workloads.
// So we try to optimize the order of bytes for indexing speed (by having quite
// unique bytes close to the beginning of the ids so that sorting is fast) and
// compression (by making sure we share common prefixes between enough ids),
// but not necessarily for lookup speed (by having the leading bytes identify
// segments whenever possible)
// MAC address adds 6 bytes:
System.arraycopy(SECURE_MUNGED_ADDRESS, 0, uuidBytes, 6, SECURE_MUNGED_ADDRESS.length);
// Blocks in the block tree have between 25 and 48 terms. So all prefixes that
// are shared by ~30 terms should be well compressed. I first tried putting the
// two lower bytes of the sequence id in the beginning of the id, but compression
// is only triggered when you have at least 30*2^16 ~= 2M documents in a segment,
// which is already quite large. So instead, we are putting the 1st and 3rd byte
// of the sequence number so that compression starts to be triggered with smaller
// segment sizes and still gives pretty good indexing speed. We use the sequenceId
// rather than the timestamp because the distribution of the timestamp depends too
// much on the indexing rate, so it is less reliable.
// Sequence number adds 3 bytes:
putLong(uuidBytes, sequenceId, 12, 3);
uuidBytes[i++] = (byte) sequenceId;
// changes every 65k docs, so potentially every second if you have a steady indexing rate
uuidBytes[i++] = (byte) (sequenceId >>> 16);
assert 9 + SECURE_MUNGED_ADDRESS.length == uuidBytes.length;
// Now we start focusing on compression and put bytes that should not change too often.
uuidBytes[i++] = (byte) (timestamp >>> 16); // changes every ~65 secs
uuidBytes[i++] = (byte) (timestamp >>> 24); // changes every ~4.5h
uuidBytes[i++] = (byte) (timestamp >>> 32); // changes every ~50 days
uuidBytes[i++] = (byte) (timestamp >>> 40); // changes every 35 years
byte[] macAddress = macAddress();
assert macAddress.length == 6;
System.arraycopy(macAddress, 0, uuidBytes, i, macAddress.length);
i += macAddress.length;
// Finally we put the remaining bytes, which will likely not be compressed at all.
uuidBytes[i++] = (byte) (timestamp >>> 8);
uuidBytes[i++] = (byte) (sequenceId >>> 8);
uuidBytes[i++] = (byte) timestamp;
assert i == uuidBytes.length;
return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes);
}

View File

@ -24,6 +24,7 @@ import java.util.Random;
public class UUIDs {
private static final RandomBasedUUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator();
private static final UUIDGenerator LEGACY_TIME_UUID_GENERATOR = new LegacyTimeBasedUUIDGenerator();
private static final UUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator();
/** Generates a time-based UUID (similar to Flake IDs), which is preferred when generating an ID to be indexed into a Lucene index as
@ -32,6 +33,11 @@ public class UUIDs {
return TIME_UUID_GENERATOR.getBase64UUID();
}
/** Legacy implementation of {@link #base64UUID()}, for pre 6.0 indices. */
public static String legacyBase64UUID() {
return LEGACY_TIME_UUID_GENERATOR.getBase64UUID();
}
/** Returns a Base64 encoded version of a Version 4.0 compatible UUID as defined here: http://www.ietf.org/rfc/rfc4122.txt, using the
* provided {@code Random} instance */
public static String randomBase64UUID(Random random) {

View File

@ -215,7 +215,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
} else {
// 3. now send the sync request to all the shards
String syncId = UUIDs.base64UUID();
String syncId = UUIDs.randomBase64UUID();
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener);
}
}

View File

@ -120,7 +120,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + ".";
this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.store = indexShard.store();
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
// make sure the store is not released until we are done.

View File

@ -122,10 +122,10 @@ public class IndexRequestTests extends ESTestCase {
public void testAutoGenIdTimestampIsSet() {
IndexRequest request = new IndexRequest("index", "type");
request.process(null, "index");
request.process(Version.CURRENT, null, "index");
assertTrue("expected > 0 but got: " + request.getAutoGeneratedTimestamp(), request.getAutoGeneratedTimestamp() > 0);
request = new IndexRequest("index", "type", "1");
request.process(null, "index");
request.process(Version.CURRENT, null, "index");
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, request.getAutoGeneratedTimestamp());
}

View File

@ -18,9 +18,24 @@
*/
package org.elasticsearch.common;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
public class UUIDTests extends ESTestCase {
@ -100,4 +115,61 @@ public class UUIDTests extends ESTestCase {
}
assertEquals(count*uuids, globalSet.size());
}
public void testCompression() throws Exception {
Logger logger = Loggers.getLogger(UUIDTests.class);
// Low number so that the test runs quickly, but the results are more interesting with larger numbers
// of indexed documents
assertThat(testCompression(500000, 10000, 3, logger), Matchers.lessThan(12d)); // ~10.8 in practice
assertThat(testCompression(500000, 1000, 3, logger), Matchers.lessThan(13d)); // ~11.5 in practice
assertThat(testCompression(500000, 100, 3, logger), Matchers.lessThan(21d)); // ~19.5 in practice
}
private static double testCompression(int numDocs, int numDocsPerSecond, int numNodes, Logger logger) throws Exception {
final double intervalBetweenDocs = 1000. / numDocsPerSecond; // milliseconds
final byte[][] macAddresses = new byte[numNodes][];
Random r = random();
for (int i = 0; i < macAddresses.length; ++i) {
macAddresses[i] = new byte[6];
random().nextBytes(macAddresses[i]);
}
UUIDGenerator generator = new TimeBasedUUIDGenerator() {
double currentTimeMillis = System.currentTimeMillis();
@Override
protected long currentTimeMillis() {
currentTimeMillis += intervalBetweenDocs * 2 * r.nextDouble();
return (long) currentTimeMillis;
}
@Override
protected byte[] macAddress() {
return RandomPicks.randomFrom(r, macAddresses);
}
};
// Avoid randomization which will slow down things without improving
// the quality of this test
Directory dir = newFSDirectory(createTempDir());
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setOpenMode(OpenMode.CREATE));
Document doc = new Document();
StringField id = new StringField("_id", "", Store.NO);
doc.add(id);
long start = System.nanoTime();
for (int i = 0; i < numDocs; ++i) {
id.setStringValue(generator.getBase64UUID());
w.addDocument(doc);
}
w.forceMerge(1);
long time = (System.nanoTime() - start) / 1000 / 1000;
w.close();
long size = 0;
for (String file : dir.listAll()) {
size += dir.fileLength(file);
}
dir.close();
double bytesPerDoc = (double) size / numDocs;
logger.info(numDocs + " docs indexed at " + numDocsPerSecond + " docs/s required " + new ByteSizeValue(size)
+ " bytes of disk space, or " + bytesPerDoc + " bytes per document. Took: " + new TimeValue(time) + ".");
return bytesPerDoc;
}
}

View File

@ -602,7 +602,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception {
for (BulkItemRequest itemRequest : request.items()) {
if (itemRequest.request() instanceof IndexRequest) {
((IndexRequest) itemRequest.request()).process(null, index.getName());
((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName());
}
}
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =

View File

@ -56,7 +56,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get();
String syncId = UUIDs.base64UUID();
String syncId = UUIDs.randomBase64UUID();
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener);
listener.latch.await();
@ -178,7 +178,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get();
}
client().admin().indices().prepareFlush("test").setForce(true).get();
String syncId = UUIDs.base64UUID();
String syncId = UUIDs.randomBase64UUID();
final SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener);
listener.latch.await();
@ -208,7 +208,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
commitIds.clear(); // wipe it...
String syncId = UUIDs.base64UUID();
String syncId = UUIDs.randomBase64UUID();
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener);
listener.latch.await();

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -36,7 +35,6 @@ import java.io.ByteArrayOutputStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.equalTo;
public class StartRecoveryRequestTests extends ESTestCase {
@ -45,7 +43,7 @@ public class StartRecoveryRequestTests extends ESTestCase {
final Version targetNodeVersion = randomVersion(random());
final StartRecoveryRequest outRequest = new StartRecoveryRequest(
new ShardId("test", "_na_", 0),
UUIDs.base64UUID(),
UUIDs.randomBase64UUID(),
new DiscoveryNode("a", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion),
Store.MetadataSnapshot.EMPTY,

View File

@ -106,9 +106,10 @@
cat.shards:
index: sync_id_test
h: index,state,sync_id
# 20 chars for sync ids with 5.x which uses time-based uuids and 22 with 6.x which uses random uuids
- match:
$body: |
/^(sync_id_test\s+STARTED\s+[A-Za-z0-9_\-]{20}\n){5}$/
/^(sync_id_test\s+STARTED\s+[A-Za-z0-9_\-]{20,22}\n){5}$/
- do:
indices.delete: