Skip to content

Commit 57f8a89

Browse files
committed
Move action and commit related stuff to stateless
Migrating: * PrimaryTermAndGeneration * ClosedShardService * BlobFile * BatchedCompoundCommit * Blob ranges * ReplicatedContent * VirtualBatchedCompoundCommit * Some action requests and responses * Shard local commit trackers * StaleCompoundCommit and related functionality and tests. Relates ES-13590
1 parent ad268e0 commit 57f8a89

File tree

44 files changed

+6720
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+6720
-1
lines changed

x-pack/plugin/stateless/src/main/java/module-info.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,18 @@
77

88
module org.elasticsearch.xpack.stateless {
99
requires org.elasticsearch.base;
10+
requires org.elasticsearch.blobcache;
11+
requires org.elasticsearch.logging;
1012
requires org.elasticsearch.server;
1113
requires org.elasticsearch.xcore;
14+
requires org.elasticsearch.xcontent;
1215
requires org.apache.logging.log4j;
13-
requires org.elasticsearch.logging;
16+
requires org.apache.lucene.core;
1417

1518
exports org.elasticsearch.xpack.stateless;
19+
exports org.elasticsearch.xpack.stateless.action;
20+
exports org.elasticsearch.xpack.stateless.cache;
21+
exports org.elasticsearch.xpack.stateless.commits;
22+
exports org.elasticsearch.xpack.stateless.engine;
23+
exports org.elasticsearch.xpack.stateless.lucene;
1624
}

x-pack/plugin/stateless/src/main/java/org/elasticsearch/xpack/stateless/StatelessPlugin.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88

99
import org.elasticsearch.cluster.node.DiscoveryNode;
1010
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
11+
import org.elasticsearch.cluster.routing.ShardRouting;
1112
import org.elasticsearch.common.settings.Setting;
1213
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.util.concurrent.EsExecutors;
15+
import org.elasticsearch.core.TimeValue;
1316
import org.elasticsearch.license.License;
1417
import org.elasticsearch.license.LicensedFeature;
1518
import org.elasticsearch.license.XPackLicenseState;
@@ -19,6 +22,9 @@
1922
import org.elasticsearch.plugins.ClusterCoordinationPlugin;
2023
import org.elasticsearch.plugins.ExtensiblePlugin;
2124
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
26+
import org.elasticsearch.threadpool.ExecutorBuilder;
27+
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
2228
import org.elasticsearch.xpack.core.XPackPlugin;
2329

2430
import java.io.IOException;
@@ -54,7 +60,168 @@ public class StatelessPlugin extends Plugin implements ClusterCoordinationPlugin
5460

5561
public static final String NAME = "stateless";
5662

63+
// Thread pool names are defined in the BlobStoreRepository because we need to verify there that no requests are running on other pools.
64+
public static final String SHARD_READ_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_READ_THREAD_NAME;
65+
public static final String SHARD_READ_THREAD_POOL_SETTING = "stateless." + SHARD_READ_THREAD_POOL + "_thread_pool";
66+
public static final String TRANSLOG_THREAD_POOL = BlobStoreRepository.STATELESS_TRANSLOG_THREAD_NAME;
67+
public static final String TRANSLOG_THREAD_POOL_SETTING = "stateless." + TRANSLOG_THREAD_POOL + "_thread_pool";
68+
public static final String SHARD_WRITE_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_WRITE_THREAD_NAME;
69+
public static final String SHARD_WRITE_THREAD_POOL_SETTING = "stateless." + SHARD_WRITE_THREAD_POOL + "_thread_pool";
70+
public static final String CLUSTER_STATE_READ_WRITE_THREAD_POOL = BlobStoreRepository.STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME;
71+
public static final String CLUSTER_STATE_READ_WRITE_THREAD_POOL_SETTING = "stateless."
72+
+ CLUSTER_STATE_READ_WRITE_THREAD_POOL
73+
+ "_thread_pool";
74+
public static final String GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL = "stateless_get_vbcc_chunk";
75+
public static final String GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING = "stateless."
76+
+ GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL
77+
+ "_thread_pool";
78+
public static final String FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL = "stateless_fill_vbcc_cache";
79+
public static final String FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING = "stateless."
80+
+ FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL
81+
+ "_thread_pool";
82+
public static final String PREWARM_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_PREWARMING_THREAD_NAME;
83+
public static final String PREWARM_THREAD_POOL_SETTING = "stateless." + PREWARM_THREAD_POOL + "_thread_pool";
84+
public static final String UPLOAD_PREWARM_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_UPLOAD_PREWARMING_THREAD_NAME;
85+
public static final String UPLOAD_PREWARM_THREAD_POOL_SETTING = "stateless." + UPLOAD_PREWARM_THREAD_POOL + "_thread_pool";
86+
87+
/**
88+
* The set of {@link ShardRouting.Role}s that we expect to see in a stateless deployment
89+
*/
90+
public static final Set<ShardRouting.Role> STATELESS_SHARD_ROLES = Set.of(ShardRouting.Role.INDEX_ONLY, ShardRouting.Role.SEARCH_ONLY);
91+
5792
private final boolean enabled;
93+
private final boolean hasIndexRole;
94+
95+
public static ExecutorBuilder<?>[] statelessExecutorBuilders(Settings settings, boolean hasIndexRole) {
96+
// TODO: Consider modifying these pool counts if we change the object store client connections based on node size.
97+
// Right now we have 10 threads for snapshots, 1 or 8 threads for translog and 20 or 28 threads for shard thread pools. This is to
98+
// attempt to keep the threads below the default client connections limit of 50. This assumption is currently broken by the snapshot
99+
// metadata pool having 50 threads. But we will continue to iterate on this numbers and limits.
100+
101+
final int processors = EsExecutors.allocatedProcessors(settings);
102+
final int shardReadMaxThreads;
103+
final int translogCoreThreads;
104+
final int translogMaxThreads;
105+
final int shardWriteCoreThreads;
106+
final int shardWriteMaxThreads;
107+
final int clusterStateReadWriteCoreThreads;
108+
final int clusterStateReadWriteMaxThreads;
109+
final int getVirtualBatchedCompoundCommitChunkCoreThreads;
110+
final int getVirtualBatchedCompoundCommitChunkMaxThreads;
111+
final int fillVirtualBatchedCompoundCommitCacheCoreThreads;
112+
final int fillVirtualBatchedCompoundCommitCacheMaxThreads;
113+
final int prewarmMaxThreads;
114+
final int uploadPrewarmCoreThreads;
115+
final int uploadPrewarmMaxThreads;
116+
117+
if (hasIndexRole) {
118+
shardReadMaxThreads = Math.min(processors * 4, 10);
119+
translogCoreThreads = 2;
120+
translogMaxThreads = Math.min(processors * 2, 8);
121+
shardWriteCoreThreads = 2;
122+
shardWriteMaxThreads = Math.min(processors * 4, 10);
123+
clusterStateReadWriteCoreThreads = 2;
124+
clusterStateReadWriteMaxThreads = 4;
125+
getVirtualBatchedCompoundCommitChunkCoreThreads = 1;
126+
getVirtualBatchedCompoundCommitChunkMaxThreads = Math.min(processors, 4);
127+
fillVirtualBatchedCompoundCommitCacheCoreThreads = 0;
128+
fillVirtualBatchedCompoundCommitCacheMaxThreads = 1;
129+
prewarmMaxThreads = Math.min(processors * 2, 32);
130+
// These threads are used for prewarming the shared blob cache on upload, and are separate from the prewarm thread pool
131+
// in order to avoid any deadlocks between the two (e.g., when two fillgaps compete). Since they are used to prewarm on upload,
132+
// we use the same amount of max threads as the shard write pool.
133+
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
134+
// threads around to reduce churn and re-use the existing buffers more
135+
uploadPrewarmMaxThreads = Math.min(processors * 4, 10);
136+
uploadPrewarmCoreThreads = uploadPrewarmMaxThreads / 2;
137+
} else {
138+
shardReadMaxThreads = Math.min(processors * 4, 28);
139+
translogCoreThreads = 0;
140+
translogMaxThreads = 1;
141+
shardWriteCoreThreads = 0;
142+
shardWriteMaxThreads = 1;
143+
clusterStateReadWriteCoreThreads = 0;
144+
clusterStateReadWriteMaxThreads = 1;
145+
getVirtualBatchedCompoundCommitChunkCoreThreads = 0;
146+
getVirtualBatchedCompoundCommitChunkMaxThreads = 1;
147+
prewarmMaxThreads = Math.min(processors * 4, 32);
148+
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
149+
// threads around to reduce churn and re-use the existing buffers more
150+
fillVirtualBatchedCompoundCommitCacheCoreThreads = Math.max(processors / 2, 2);
151+
fillVirtualBatchedCompoundCommitCacheMaxThreads = Math.max(processors, 2);
152+
uploadPrewarmCoreThreads = 0;
153+
uploadPrewarmMaxThreads = 1;
154+
}
155+
156+
return new ExecutorBuilder<?>[] {
157+
new ScalingExecutorBuilder(
158+
SHARD_READ_THREAD_POOL,
159+
4,
160+
shardReadMaxThreads,
161+
TimeValue.timeValueMinutes(5),
162+
true,
163+
SHARD_READ_THREAD_POOL_SETTING,
164+
EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(0.3).build()
165+
),
166+
new ScalingExecutorBuilder(
167+
TRANSLOG_THREAD_POOL,
168+
translogCoreThreads,
169+
translogMaxThreads,
170+
TimeValue.timeValueMinutes(5),
171+
true,
172+
TRANSLOG_THREAD_POOL_SETTING
173+
),
174+
new ScalingExecutorBuilder(
175+
SHARD_WRITE_THREAD_POOL,
176+
shardWriteCoreThreads,
177+
shardWriteMaxThreads,
178+
TimeValue.timeValueMinutes(5),
179+
true,
180+
SHARD_WRITE_THREAD_POOL_SETTING
181+
),
182+
new ScalingExecutorBuilder(
183+
CLUSTER_STATE_READ_WRITE_THREAD_POOL,
184+
clusterStateReadWriteCoreThreads,
185+
clusterStateReadWriteMaxThreads,
186+
TimeValue.timeValueMinutes(5),
187+
true,
188+
CLUSTER_STATE_READ_WRITE_THREAD_POOL_SETTING
189+
),
190+
new ScalingExecutorBuilder(
191+
GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL,
192+
getVirtualBatchedCompoundCommitChunkCoreThreads,
193+
getVirtualBatchedCompoundCommitChunkMaxThreads,
194+
TimeValue.timeValueMinutes(5),
195+
true,
196+
GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING
197+
),
198+
new ScalingExecutorBuilder(
199+
FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL,
200+
fillVirtualBatchedCompoundCommitCacheCoreThreads,
201+
fillVirtualBatchedCompoundCommitCacheMaxThreads,
202+
TimeValue.timeValueMinutes(5),
203+
true,
204+
FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING
205+
),
206+
new ScalingExecutorBuilder(
207+
PREWARM_THREAD_POOL,
208+
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
209+
// threads around to reduce churn and re-use the existing buffers more
210+
prewarmMaxThreads / 2,
211+
prewarmMaxThreads,
212+
TimeValue.timeValueMinutes(5),
213+
true,
214+
PREWARM_THREAD_POOL_SETTING
215+
),
216+
new ScalingExecutorBuilder(
217+
UPLOAD_PREWARM_THREAD_POOL,
218+
uploadPrewarmCoreThreads,
219+
uploadPrewarmMaxThreads,
220+
TimeValue.timeValueMinutes(5),
221+
true,
222+
UPLOAD_PREWARM_THREAD_POOL_SETTING
223+
) };
224+
}
58225

59226
@Override
60227
public List<Setting<?>> getSettings() {
@@ -106,6 +273,7 @@ public StatelessPlugin(Settings settings) {
106273
);
107274
}
108275
}
276+
hasIndexRole = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.INDEX_ROLE);
109277
}
110278

111279
@Override
@@ -152,4 +320,14 @@ public void close() throws IOException {
152320
public boolean isEnabled() {
153321
return enabled;
154322
}
323+
324+
@Override
325+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
326+
if (enabled) {
327+
return List.of(statelessExecutorBuilders(settings, hasIndexRole));
328+
} else {
329+
return super.getExecutorBuilders(settings);
330+
}
331+
}
332+
155333
}

0 commit comments

Comments
 (0)