Skip to content

Commit a0ee98c

Browse files
authored
Integrate stored fields format bloom filter with synthetic _id (#138515)
This commit integrates ES93BloomFilterStoredFieldsFormat with synthetic _id lookups. It introduces a dedicated Codec meant to be used only by TIME_SERIES indices. This new codec is necessary to support loading through SPI (e.g., after a shard relocation or node restart). It wraps the existing codec and extends it with the necessary plumbing to populate the bloom filter during indexing.
1 parent e34bc73 commit a0ee98c

File tree

20 files changed

+830
-259
lines changed

20 files changed

+830
-259
lines changed

docs/changelog/138515.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138515
2+
summary: Integrate stored fields format bloom filter with synthetic `_id`
3+
area: Codec
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBSyntheticIdsIT.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.common.util.set.Sets;
2626
import org.elasticsearch.index.IndexMode;
2727
import org.elasticsearch.index.IndexSettings;
28+
import org.elasticsearch.index.codec.CodecService;
29+
import org.elasticsearch.index.engine.EngineConfig;
2830
import org.elasticsearch.index.mapper.IdFieldMapper;
2931
import org.elasticsearch.index.query.TermQueryBuilder;
3032
import org.elasticsearch.plugins.Plugin;
@@ -54,6 +56,7 @@
5456
import static org.hamcrest.Matchers.containsString;
5557
import static org.hamcrest.Matchers.equalTo;
5658
import static org.hamcrest.Matchers.notNullValue;
59+
import static org.hamcrest.Matchers.nullValue;
5760

5861
/**
5962
* Test suite for time series indices that use synthetic ids for documents.
@@ -101,6 +104,40 @@ public void testInvalidIndexMode() {
101104
);
102105
}
103106

107+
public void testInvalidCodec() {
108+
assumeTrue("Test should only run with feature flag", IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG);
109+
final var indexName = randomIdentifier();
110+
internalCluster().startDataOnlyNode();
111+
var randomNonDefaultCodec = randomFrom(
112+
CodecService.BEST_COMPRESSION_CODEC,
113+
CodecService.LEGACY_DEFAULT_CODEC,
114+
CodecService.BEST_COMPRESSION_CODEC,
115+
CodecService.LUCENE_DEFAULT_CODEC
116+
);
117+
118+
var exception = expectThrows(
119+
IllegalArgumentException.class,
120+
() -> createIndex(
121+
indexName,
122+
indexSettings(1, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
123+
.put("index.routing_path", "hostname")
124+
.put(IndexSettings.USE_SYNTHETIC_ID.getKey(), true)
125+
.put(EngineConfig.INDEX_CODEC_SETTING.getKey(), randomNonDefaultCodec)
126+
.build()
127+
)
128+
);
129+
assertThat(
130+
exception.getMessage(),
131+
containsString(
132+
"The setting ["
133+
+ IndexSettings.USE_SYNTHETIC_ID.getKey()
134+
+ "] is only permitted when [index.codec] is set to [default]. Current mode: ["
135+
+ randomNonDefaultCodec
136+
+ "]."
137+
)
138+
);
139+
}
140+
104141
public void testSyntheticId() throws Exception {
105142
assumeTrue("Test should only run with feature flag", IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG);
106143
final var dataStreamName = randomIdentifier();
@@ -260,12 +297,19 @@ enum Operation {
260297

261298
flush(dataStreamName);
262299

300+
// TODO: Restart the node or relocate the shard randomly
301+
263302
// Check that synthetic _id field have no postings on disk
264303
var indices = new HashSet<>(docs.values());
265304
for (var index : indices) {
266305
var diskUsage = diskUsage(index);
267306
var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
268-
assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
307+
// When _id's are only used to populate the bloom filter,
308+
// IndexDiskUsageStats won't account for anything since
309+
// the bloom filter it's not exposed through the Reader API and
310+
// the analyzer expects to get documents with fields to do the
311+
// disk usage accounting.
312+
assertThat(diskUsageIdField, nullValue());
269313
}
270314
}
271315

@@ -376,7 +420,12 @@ public void testGetFromTranslogBySyntheticId() throws Exception {
376420
for (var index : indices) {
377421
var diskUsage = diskUsage(index);
378422
var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
379-
assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
423+
// When _id's are only used to populate the bloom filter,
424+
// IndexDiskUsageStats won't account for anything since
425+
// the bloom filter it's not exposed through the Reader API and
426+
// the analyzer expects to get documents with fields to do the
427+
// disk usage accounting.
428+
assertThat(diskUsageIdField, nullValue());
380429
}
381430

382431
assertHitCount(client().prepareSearch(dataStreamName).setSize(0), 10L);

server/src/main/java/module-info.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@
245245
exports org.elasticsearch.index.codec;
246246
exports org.elasticsearch.index.codec.tsdb;
247247
exports org.elasticsearch.index.codec.bloomfilter;
248+
exports org.elasticsearch.index.codec.storedfields;
248249
exports org.elasticsearch.index.codec.zstd;
249250
exports org.elasticsearch.index.engine;
250251
exports org.elasticsearch.index.fielddata;
@@ -481,7 +482,8 @@
481482
org.elasticsearch.index.codec.Elasticsearch816Codec,
482483
org.elasticsearch.index.codec.Elasticsearch900Codec,
483484
org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec,
484-
org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec;
485+
org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec,
486+
org.elasticsearch.index.codec.ES93TSDBDefaultCompressionLucene103Codec;
485487

486488
provides org.apache.logging.log4j.core.util.ContextDataProvider with org.elasticsearch.common.logging.DynamicContextDataProvider;
487489

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.util.FeatureFlag;
2828
import org.elasticsearch.core.Booleans;
2929
import org.elasticsearch.core.TimeValue;
30+
import org.elasticsearch.index.codec.CodecService;
3031
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
3132
import org.elasticsearch.index.mapper.Mapper;
3233
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
@@ -49,6 +50,7 @@
4950

5051
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_VERSION_CREATED;
5152
import static org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING;
53+
import static org.elasticsearch.index.engine.EngineConfig.INDEX_CODEC_SETTING;
5254
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
5355
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING;
5456
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
@@ -715,12 +717,26 @@ public void validate(Boolean enabled, Map<Setting<?>, Object> settings) {
715717
)
716718
);
717719
}
720+
721+
var codecName = (String) settings.get(INDEX_CODEC_SETTING);
722+
if (codecName.equals(CodecService.DEFAULT_CODEC) == false) {
723+
throw new IllegalArgumentException(
724+
String.format(
725+
Locale.ROOT,
726+
"The setting [%s] is only permitted when [%s] is set to [%s]. Current mode: [%s].",
727+
USE_SYNTHETIC_ID.getKey(),
728+
INDEX_CODEC_SETTING.getKey(),
729+
CodecService.DEFAULT_CODEC,
730+
codecName
731+
)
732+
);
733+
}
718734
}
719735
}
720736

721737
@Override
722738
public Iterator<Setting<?>> settings() {
723-
List<Setting<?>> list = List.of(MODE);
739+
List<Setting<?>> list = List.of(MODE, INDEX_CODEC_SETTING);
724740
return list.iterator();
725741
}
726742
},

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ private static Version parseUnchecked(String version) {
203203
public static final IndexVersion SKIPPER_DEFAULTS_ONLY_ON_TSDB = def(9_052_0_00, Version.LUCENE_10_3_2);
204204
public static final IndexVersion DISK_BBQ_LICENSE_ENFORCEMENT = def(9_053_0_00, Version.LUCENE_10_3_2);
205205
public static final IndexVersion STORE_IGNORED_KEYWORDS_IN_BINARY_DOC_VALUES = def(9_054_0_00, Version.LUCENE_10_3_2);
206+
public static final IndexVersion TIME_SERIES_USE_STORED_FIELDS_BLOOM_FILTER_FOR_ID = def(9_055_0_00, Version.LUCENE_10_3_2);
206207

207208
/*
208209
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/index/codec/CodecService.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
import org.elasticsearch.common.util.BigArrays;
1717
import org.elasticsearch.common.util.FeatureFlag;
1818
import org.elasticsearch.core.Nullable;
19-
import org.elasticsearch.index.IndexMode;
20-
import org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdCodec;
19+
import org.elasticsearch.index.IndexVersions;
2120
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;
2221
import org.elasticsearch.index.mapper.MapperService;
2322

@@ -48,8 +47,17 @@ public class CodecService implements CodecProvider {
4847
public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays) {
4948
final var codecs = new HashMap<String, Codec>();
5049

51-
Codec legacyBestSpeedCodec = new LegacyPerFieldMapperCodec(Lucene103Codec.Mode.BEST_SPEED, mapperService, bigArrays);
52-
if (ZSTD_STORED_FIELDS_FEATURE_FLAG) {
50+
boolean useSyntheticId = mapperService != null
51+
&& mapperService.getIndexSettings().useTimeSeriesSyntheticId()
52+
&& mapperService.getIndexSettings()
53+
.getIndexVersionCreated()
54+
.onOrAfter(IndexVersions.TIME_SERIES_USE_STORED_FIELDS_BLOOM_FILTER_FOR_ID);
55+
56+
var legacyBestSpeedCodec = new LegacyPerFieldMapperCodec(Lucene103Codec.Mode.BEST_SPEED, mapperService, bigArrays);
57+
if (useSyntheticId) {
58+
// Use the default Lucene compression when the synthetic id is used even if the ZSTD feature flag is enabled
59+
codecs.put(DEFAULT_CODEC, new ES93TSDBDefaultCompressionLucene103Codec(legacyBestSpeedCodec, bigArrays));
60+
} else if (ZSTD_STORED_FIELDS_FEATURE_FLAG) {
5361
codecs.put(DEFAULT_CODEC, new PerFieldMapperCodec(Zstd814StoredFieldsFormat.Mode.BEST_SPEED, mapperService, bigArrays));
5462
} else {
5563
codecs.put(DEFAULT_CODEC, legacyBestSpeedCodec);
@@ -67,8 +75,6 @@ public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays)
6775
for (String codec : Codec.availableCodecs()) {
6876
codecs.put(codec, Codec.forName(codec));
6977
}
70-
final boolean useTsdbSyntheticId = mapperService != null && mapperService.getIndexSettings().useTimeSeriesSyntheticId();
71-
assert useTsdbSyntheticId == false || mapperService.getIndexSettings().getMode() == IndexMode.TIME_SERIES;
7278

7379
this.codecs = codecs.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {
7480
Codec codec;
@@ -77,9 +83,6 @@ public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays)
7783
} else {
7884
codec = new DeduplicateFieldInfosCodec(e.getValue().getName(), e.getValue());
7985
}
80-
if (useTsdbSyntheticId && codec instanceof TSDBSyntheticIdCodec == false) {
81-
codec = new TSDBSyntheticIdCodec(codec.getName(), codec);
82-
}
8386
return codec;
8487
}));
8588
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec;
11+
12+
import org.apache.lucene.codecs.lucene103.Lucene103Codec;
13+
import org.elasticsearch.common.util.BigArrays;
14+
15+
public class ES93TSDBDefaultCompressionLucene103Codec extends TSDBCodecWithSyntheticId {
16+
/** Public no-arg constructor, needed for SPI loading at read-time. */
17+
public ES93TSDBDefaultCompressionLucene103Codec() {
18+
this(new Lucene103Codec(), null);
19+
}
20+
21+
ES93TSDBDefaultCompressionLucene103Codec(Lucene103Codec delegate, BigArrays bigArrays) {
22+
super("ES93TSDBDefaultCompressionLucene103Codec", delegate, bigArrays);
23+
}
24+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec;
11+
12+
import org.apache.lucene.codecs.Codec;
13+
import org.apache.lucene.codecs.FilterCodec;
14+
import org.apache.lucene.codecs.StoredFieldsFormat;
15+
import org.elasticsearch.common.util.BigArrays;
16+
import org.elasticsearch.index.codec.bloomfilter.ES93BloomFilterStoredFieldsFormat;
17+
import org.elasticsearch.index.codec.storedfields.TSDBStoredFieldsFormat;
18+
import org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdCodec;
19+
import org.elasticsearch.index.mapper.IdFieldMapper;
20+
21+
/**
22+
* Abstract base class for ES codecs used with time-series ({@code TIME_SERIES}) indices
23+
* that employ synthetic document IDs for storage optimization.
24+
*
25+
* <p>This class configures the codec to use the following formats:
26+
* <ul>
27+
* <li>
28+
* Use {@link TSDBSyntheticIdCodec} as the underlying codec for synthesizing the `_id` field from
29+
* the values of other fields of the document (ex: _tsid, @timestamp, etc.) so that no inverted index
30+
* or stored field are required for the `_id`. As such, looking up documents by `_id` might be very
31+
* slow and that's why it is used along with a Bloom filter.
32+
* </li>
33+
* <li>
34+
* Apply {@link TSDBStoredFieldsFormat} with bloom filter optimization for efficient ID lookups
35+
* </li>
36+
* </ul>
37+
*
38+
* <p>Synthetic IDs in TSDB indices are generated from the document's dimensions and timestamp,
39+
* replacing the standard {@code _id} field to reduce storage overhead.
40+
*
41+
* @see TSDBSyntheticIdCodec
42+
* @see TSDBStoredFieldsFormat
43+
*/
44+
abstract class TSDBCodecWithSyntheticId extends FilterCodec {
45+
private final TSDBStoredFieldsFormat storedFieldsFormat;
46+
47+
TSDBCodecWithSyntheticId(String name, Codec delegate, BigArrays bigArrays) {
48+
super(name, new TSDBSyntheticIdCodec(delegate));
49+
this.storedFieldsFormat = new TSDBStoredFieldsFormat(
50+
delegate.storedFieldsFormat(),
51+
new ES93BloomFilterStoredFieldsFormat(
52+
bigArrays,
53+
ES93BloomFilterStoredFieldsFormat.DEFAULT_BLOOM_FILTER_SIZE,
54+
IdFieldMapper.NAME
55+
)
56+
);
57+
}
58+
59+
@Override
60+
public StoredFieldsFormat storedFieldsFormat() {
61+
return storedFieldsFormat;
62+
}
63+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.bloomfilter;
11+
12+
import org.apache.lucene.util.BytesRef;
13+
14+
import java.io.Closeable;
15+
import java.io.IOException;
16+
17+
public interface BloomFilter extends Closeable {
18+
BloomFilter NO_FILTER = new BloomFilter() {
19+
@Override
20+
public void close() throws IOException {
21+
22+
}
23+
24+
@Override
25+
public boolean mayContainTerm(String field, BytesRef term) throws IOException {
26+
return true;
27+
}
28+
};
29+
30+
/**
31+
* Tests whether the given term may exist in the specified field.
32+
*
33+
* @param field the field name to check
34+
* @param term the term to test for membership
35+
* @return true if term may be present, false if definitely absent
36+
*/
37+
boolean mayContainTerm(String field, BytesRef term) throws IOException;
38+
}

0 commit comments

Comments
 (0)