Skip to content

Commit 7ea22e9

Browse files
committed
Move commit related stuff to stateless
Migrating: * PrimaryTermAndGeneration * ClosedShardService * BlobFile * BatchedCompoundCommit * Blob ranges * ReplicatedContent * VirtualBatchedCompoundCommit and related functionality and tests. Relates ES-13590
1 parent fc78e02 commit 7ea22e9

27 files changed

+5094
-1
lines changed

x-pack/plugin/stateless/src/main/java/module-info.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,17 @@
77

88
module org.elasticsearch.xpack.stateless {
99
requires org.elasticsearch.base;
10+
requires org.elasticsearch.blobcache;
11+
requires org.elasticsearch.logging;
1012
requires org.elasticsearch.server;
1113
requires org.elasticsearch.xcore;
14+
requires org.elasticsearch.xcontent;
1215
requires org.apache.logging.log4j;
13-
requires org.elasticsearch.logging;
16+
requires org.apache.lucene.core;
1417

1518
exports org.elasticsearch.xpack.stateless;
19+
exports org.elasticsearch.xpack.stateless.cache;
20+
exports org.elasticsearch.xpack.stateless.commits;
21+
exports org.elasticsearch.xpack.stateless.engine;
22+
exports org.elasticsearch.xpack.stateless.lucene;
1623
}

x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88

99
import org.elasticsearch.cluster.node.DiscoveryNode;
1010
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
11+
import org.elasticsearch.cluster.routing.ShardRouting;
1112
import org.elasticsearch.common.settings.Setting;
1213
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.util.concurrent.EsExecutors;
15+
import org.elasticsearch.core.TimeValue;
1316
import org.elasticsearch.license.License;
1417
import org.elasticsearch.license.LicensedFeature;
1518
import org.elasticsearch.license.XPackLicenseState;
@@ -19,6 +22,9 @@
1922
import org.elasticsearch.plugins.ClusterCoordinationPlugin;
2023
import org.elasticsearch.plugins.ExtensiblePlugin;
2124
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
26+
import org.elasticsearch.threadpool.ExecutorBuilder;
27+
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
2228
import org.elasticsearch.xpack.core.XPackPlugin;
2329

2430
import java.io.IOException;
@@ -54,7 +60,168 @@ public class StatelessPlugin extends Plugin implements ClusterCoordinationPlugin
5460

5561
public static final String NAME = "stateless";
5662

63+
// Thread pool names are defined in the BlobStoreRepository because we need to verify there that no requests are running on other pools.
64+
public static final String SHARD_READ_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_READ_THREAD_NAME;
65+
public static final String SHARD_READ_THREAD_POOL_SETTING = "stateless." + SHARD_READ_THREAD_POOL + "_thread_pool";
66+
public static final String TRANSLOG_THREAD_POOL = BlobStoreRepository.STATELESS_TRANSLOG_THREAD_NAME;
67+
public static final String TRANSLOG_THREAD_POOL_SETTING = "stateless." + TRANSLOG_THREAD_POOL + "_thread_pool";
68+
public static final String SHARD_WRITE_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_WRITE_THREAD_NAME;
69+
public static final String SHARD_WRITE_THREAD_POOL_SETTING = "stateless." + SHARD_WRITE_THREAD_POOL + "_thread_pool";
70+
public static final String CLUSTER_STATE_READ_WRITE_THREAD_POOL = BlobStoreRepository.STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME;
71+
public static final String CLUSTER_STATE_READ_WRITE_THREAD_POOL_SETTING = "stateless."
72+
+ CLUSTER_STATE_READ_WRITE_THREAD_POOL
73+
+ "_thread_pool";
74+
public static final String GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL = "stateless_get_vbcc_chunk";
75+
public static final String GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING = "stateless."
76+
+ GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL
77+
+ "_thread_pool";
78+
public static final String FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL = "stateless_fill_vbcc_cache";
79+
public static final String FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING = "stateless."
80+
+ FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL
81+
+ "_thread_pool";
82+
public static final String PREWARM_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_PREWARMING_THREAD_NAME;
83+
public static final String PREWARM_THREAD_POOL_SETTING = "stateless." + PREWARM_THREAD_POOL + "_thread_pool";
84+
public static final String UPLOAD_PREWARM_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_UPLOAD_PREWARMING_THREAD_NAME;
85+
public static final String UPLOAD_PREWARM_THREAD_POOL_SETTING = "stateless." + UPLOAD_PREWARM_THREAD_POOL + "_thread_pool";
86+
87+
/**
88+
* The set of {@link ShardRouting.Role}s that we expect to see in a stateless deployment
89+
*/
90+
public static final Set<ShardRouting.Role> STATELESS_SHARD_ROLES = Set.of(ShardRouting.Role.INDEX_ONLY, ShardRouting.Role.SEARCH_ONLY);
91+
5792
private final boolean enabled;
93+
private final boolean hasIndexRole;
94+
95+
public static ExecutorBuilder<?>[] statelessExecutorBuilders(Settings settings, boolean hasIndexRole) {
96+
// TODO: Consider modifying these pool counts if we change the object store client connections based on node size.
97+
// Right now we have 10 threads for snapshots, 1 or 8 threads for translog and 20 or 28 threads for shard thread pools. This is to
98+
// attempt to keep the threads below the default client connections limit of 50. This assumption is currently broken by the snapshot
99+
// metadata pool having 50 threads. But we will continue to iterate on this numbers and limits.
100+
101+
final int processors = EsExecutors.allocatedProcessors(settings);
102+
final int shardReadMaxThreads;
103+
final int translogCoreThreads;
104+
final int translogMaxThreads;
105+
final int shardWriteCoreThreads;
106+
final int shardWriteMaxThreads;
107+
final int clusterStateReadWriteCoreThreads;
108+
final int clusterStateReadWriteMaxThreads;
109+
final int getVirtualBatchedCompoundCommitChunkCoreThreads;
110+
final int getVirtualBatchedCompoundCommitChunkMaxThreads;
111+
final int fillVirtualBatchedCompoundCommitCacheCoreThreads;
112+
final int fillVirtualBatchedCompoundCommitCacheMaxThreads;
113+
final int prewarmMaxThreads;
114+
final int uploadPrewarmCoreThreads;
115+
final int uploadPrewarmMaxThreads;
116+
117+
if (hasIndexRole) {
118+
shardReadMaxThreads = Math.min(processors * 4, 10);
119+
translogCoreThreads = 2;
120+
translogMaxThreads = Math.min(processors * 2, 8);
121+
shardWriteCoreThreads = 2;
122+
shardWriteMaxThreads = Math.min(processors * 4, 10);
123+
clusterStateReadWriteCoreThreads = 2;
124+
clusterStateReadWriteMaxThreads = 4;
125+
getVirtualBatchedCompoundCommitChunkCoreThreads = 1;
126+
getVirtualBatchedCompoundCommitChunkMaxThreads = Math.min(processors, 4);
127+
fillVirtualBatchedCompoundCommitCacheCoreThreads = 0;
128+
fillVirtualBatchedCompoundCommitCacheMaxThreads = 1;
129+
prewarmMaxThreads = Math.min(processors * 2, 32);
130+
// These threads are used for prewarming the shared blob cache on upload, and are separate from the prewarm thread pool
131+
// in order to avoid any deadlocks between the two (e.g., when two fillgaps compete). Since they are used to prewarm on upload,
132+
// we use the same amount of max threads as the shard write pool.
133+
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
134+
// threads around to reduce churn and re-use the existing buffers more
135+
uploadPrewarmMaxThreads = Math.min(processors * 4, 10);
136+
uploadPrewarmCoreThreads = uploadPrewarmMaxThreads / 2;
137+
} else {
138+
shardReadMaxThreads = Math.min(processors * 4, 28);
139+
translogCoreThreads = 0;
140+
translogMaxThreads = 1;
141+
shardWriteCoreThreads = 0;
142+
shardWriteMaxThreads = 1;
143+
clusterStateReadWriteCoreThreads = 0;
144+
clusterStateReadWriteMaxThreads = 1;
145+
getVirtualBatchedCompoundCommitChunkCoreThreads = 0;
146+
getVirtualBatchedCompoundCommitChunkMaxThreads = 1;
147+
prewarmMaxThreads = Math.min(processors * 4, 32);
148+
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
149+
// threads around to reduce churn and re-use the existing buffers more
150+
fillVirtualBatchedCompoundCommitCacheCoreThreads = Math.max(processors / 2, 2);
151+
fillVirtualBatchedCompoundCommitCacheMaxThreads = Math.max(processors, 2);
152+
uploadPrewarmCoreThreads = 0;
153+
uploadPrewarmMaxThreads = 1;
154+
}
155+
156+
return new ExecutorBuilder<?>[] {
157+
new ScalingExecutorBuilder(
158+
SHARD_READ_THREAD_POOL,
159+
4,
160+
shardReadMaxThreads,
161+
TimeValue.timeValueMinutes(5),
162+
true,
163+
SHARD_READ_THREAD_POOL_SETTING,
164+
EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(0.3).build()
165+
),
166+
new ScalingExecutorBuilder(
167+
TRANSLOG_THREAD_POOL,
168+
translogCoreThreads,
169+
translogMaxThreads,
170+
TimeValue.timeValueMinutes(5),
171+
true,
172+
TRANSLOG_THREAD_POOL_SETTING
173+
),
174+
new ScalingExecutorBuilder(
175+
SHARD_WRITE_THREAD_POOL,
176+
shardWriteCoreThreads,
177+
shardWriteMaxThreads,
178+
TimeValue.timeValueMinutes(5),
179+
true,
180+
SHARD_WRITE_THREAD_POOL_SETTING
181+
),
182+
new ScalingExecutorBuilder(
183+
CLUSTER_STATE_READ_WRITE_THREAD_POOL,
184+
clusterStateReadWriteCoreThreads,
185+
clusterStateReadWriteMaxThreads,
186+
TimeValue.timeValueMinutes(5),
187+
true,
188+
CLUSTER_STATE_READ_WRITE_THREAD_POOL_SETTING
189+
),
190+
new ScalingExecutorBuilder(
191+
GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL,
192+
getVirtualBatchedCompoundCommitChunkCoreThreads,
193+
getVirtualBatchedCompoundCommitChunkMaxThreads,
194+
TimeValue.timeValueMinutes(5),
195+
true,
196+
GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING
197+
),
198+
new ScalingExecutorBuilder(
199+
FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL,
200+
fillVirtualBatchedCompoundCommitCacheCoreThreads,
201+
fillVirtualBatchedCompoundCommitCacheMaxThreads,
202+
TimeValue.timeValueMinutes(5),
203+
true,
204+
FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING
205+
),
206+
new ScalingExecutorBuilder(
207+
PREWARM_THREAD_POOL,
208+
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
209+
// threads around to reduce churn and re-use the existing buffers more
210+
prewarmMaxThreads / 2,
211+
prewarmMaxThreads,
212+
TimeValue.timeValueMinutes(5),
213+
true,
214+
PREWARM_THREAD_POOL_SETTING
215+
),
216+
new ScalingExecutorBuilder(
217+
UPLOAD_PREWARM_THREAD_POOL,
218+
uploadPrewarmCoreThreads,
219+
uploadPrewarmMaxThreads,
220+
TimeValue.timeValueMinutes(5),
221+
true,
222+
UPLOAD_PREWARM_THREAD_POOL_SETTING
223+
) };
224+
}
58225

59226
@Override
60227
public List<Setting<?>> getSettings() {
@@ -106,6 +273,7 @@ public StatelessPlugin(Settings settings) {
106273
);
107274
}
108275
}
276+
hasIndexRole = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.INDEX_ROLE);
109277
}
110278

111279
@Override
@@ -152,4 +320,14 @@ public void close() throws IOException {
152320
public boolean isEnabled() {
153321
return enabled;
154322
}
323+
324+
@Override
325+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
326+
if (enabled) {
327+
return List.of(statelessExecutorBuilders(settings, hasIndexRole));
328+
} else {
329+
return super.getExecutorBuilders(settings);
330+
}
331+
}
332+
155333
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.stateless.cache;
9+
10+
import org.apache.lucene.codecs.CodecUtil;
11+
import org.apache.lucene.index.CorruptIndexException;
12+
import org.apache.lucene.store.DataInput;
13+
import org.apache.lucene.store.Directory;
14+
import org.apache.lucene.store.IOContext;
15+
import org.apache.lucene.util.CollectionUtil;
16+
import org.apache.lucene.util.StringHelper;
17+
import org.elasticsearch.index.store.LuceneFilesExtensions;
18+
19+
import java.io.IOException;
20+
import java.util.Map;
21+
22+
/**
23+
* This file is mostly copied from org.apache.lucene.codecs.lucene90.Lucene90CompoundReader
24+
* in order to be able to parse compound segment entries in order to prewarm them.
25+
* Currently, it is impossible to reuse the original class as the necessary code has private access
26+
*/
27+
public class Lucene90CompoundEntriesReader {
28+
29+
static final String ENTRY_CODEC = "Lucene90CompoundEntries";
30+
static final int VERSION_START = 0;
31+
static final int VERSION_CURRENT = VERSION_START;
32+
33+
public static Map<String, FileEntry> readEntries(Directory directory, String filename) throws IOException {
34+
assert LuceneFilesExtensions.fromFile(filename) == LuceneFilesExtensions.CFE : filename;
35+
try (var input = directory.openInput(filename, IOContext.READONCE)) {
36+
return Lucene90CompoundEntriesReader.readEntries(input);
37+
}
38+
}
39+
40+
/**
41+
* This method skips the input validation and only lists the entries in a cfe file.
42+
* Validation is going to be performed later once directory is opened for the index engine.
43+
*/
44+
public static Map<String, FileEntry> readEntries(DataInput dataInput) throws IOException {
45+
CodecUtil.checkHeader(dataInput, ENTRY_CODEC, VERSION_START, VERSION_CURRENT);
46+
dataInput.skipBytes(StringHelper.ID_LENGTH);
47+
CodecUtil.checkIndexHeaderSuffix(dataInput, "");
48+
return readMapping(dataInput);
49+
}
50+
51+
private static Map<String, FileEntry> readMapping(DataInput entriesStream) throws IOException {
52+
final int numEntries = entriesStream.readVInt();
53+
var mapping = CollectionUtil.<String, FileEntry>newHashMap(numEntries);
54+
for (int i = 0; i < numEntries; i++) {
55+
final String id = entriesStream.readString();
56+
final FileEntry fileEntry = new FileEntry(entriesStream.readLong(), entriesStream.readLong());
57+
FileEntry previous = mapping.put(id, fileEntry);
58+
if (previous != null) {
59+
throw new CorruptIndexException("Duplicate cfs entry id=" + id + " in CFS ", entriesStream);
60+
}
61+
}
62+
return mapping;
63+
}
64+
65+
public record FileEntry(long offset, long length) {}
66+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.stateless.commits;
9+
10+
import org.elasticsearch.xpack.stateless.engine.PrimaryTermAndGeneration;
11+
12+
public interface AbstractBatchedCompoundCommit {
13+
PrimaryTermAndGeneration primaryTermAndGeneration();
14+
15+
StatelessCompoundCommit lastCompoundCommit();
16+
}

0 commit comments

Comments
 (0)