Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion x-pack/plugin/stateless/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@

module org.elasticsearch.xpack.stateless {
requires org.elasticsearch.base;
requires org.elasticsearch.blobcache;
requires org.elasticsearch.logging;
requires org.elasticsearch.server;
requires org.elasticsearch.xcore;
requires org.elasticsearch.xcontent;
requires org.apache.logging.log4j;
requires org.elasticsearch.logging;
requires org.apache.lucene.core;

exports org.elasticsearch.xpack.stateless;
exports org.elasticsearch.xpack.stateless.action;
exports org.elasticsearch.xpack.stateless.cache;
exports org.elasticsearch.xpack.stateless.commits;
exports org.elasticsearch.xpack.stateless.engine;
exports org.elasticsearch.xpack.stateless.lucene;
exports org.elasticsearch.xpack.stateless.utils;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicensedFeature;
import org.elasticsearch.license.XPackLicenseState;
Expand All @@ -19,6 +22,9 @@
import org.elasticsearch.plugins.ClusterCoordinationPlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.io.IOException;
Expand Down Expand Up @@ -54,7 +60,168 @@ public class StatelessPlugin extends Plugin implements ClusterCoordinationPlugin

public static final String NAME = "stateless";

// Thread pool names are defined in the BlobStoreRepository because we need to verify there that no requests are running on other pools.
public static final String SHARD_READ_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_READ_THREAD_NAME;
public static final String SHARD_READ_THREAD_POOL_SETTING = "stateless." + SHARD_READ_THREAD_POOL + "_thread_pool";
public static final String TRANSLOG_THREAD_POOL = BlobStoreRepository.STATELESS_TRANSLOG_THREAD_NAME;
public static final String TRANSLOG_THREAD_POOL_SETTING = "stateless." + TRANSLOG_THREAD_POOL + "_thread_pool";
public static final String SHARD_WRITE_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_WRITE_THREAD_NAME;
public static final String SHARD_WRITE_THREAD_POOL_SETTING = "stateless." + SHARD_WRITE_THREAD_POOL + "_thread_pool";
public static final String CLUSTER_STATE_READ_WRITE_THREAD_POOL = BlobStoreRepository.STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME;
public static final String CLUSTER_STATE_READ_WRITE_THREAD_POOL_SETTING = "stateless."
+ CLUSTER_STATE_READ_WRITE_THREAD_POOL
+ "_thread_pool";
public static final String GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL = "stateless_get_vbcc_chunk";
public static final String GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING = "stateless."
+ GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL
+ "_thread_pool";
public static final String FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL = "stateless_fill_vbcc_cache";
public static final String FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING = "stateless."
+ FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL
+ "_thread_pool";
public static final String PREWARM_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_PREWARMING_THREAD_NAME;
public static final String PREWARM_THREAD_POOL_SETTING = "stateless." + PREWARM_THREAD_POOL + "_thread_pool";
public static final String UPLOAD_PREWARM_THREAD_POOL = BlobStoreRepository.STATELESS_SHARD_UPLOAD_PREWARMING_THREAD_NAME;
public static final String UPLOAD_PREWARM_THREAD_POOL_SETTING = "stateless." + UPLOAD_PREWARM_THREAD_POOL + "_thread_pool";

/**
* The set of {@link ShardRouting.Role}s that we expect to see in a stateless deployment
*/
public static final Set<ShardRouting.Role> STATELESS_SHARD_ROLES = Set.of(ShardRouting.Role.INDEX_ONLY, ShardRouting.Role.SEARCH_ONLY);

private final boolean enabled;
private final boolean hasIndexRole;

public static ExecutorBuilder<?>[] statelessExecutorBuilders(Settings settings, boolean hasIndexRole) {
// TODO: Consider modifying these pool counts if we change the object store client connections based on node size.
// Right now we have 10 threads for snapshots, 1 or 8 threads for translog and 20 or 28 threads for shard thread pools. This is to
// attempt to keep the threads below the default client connections limit of 50. This assumption is currently broken by the snapshot
// metadata pool having 50 threads. But we will continue to iterate on this numbers and limits.

final int processors = EsExecutors.allocatedProcessors(settings);
final int shardReadMaxThreads;
final int translogCoreThreads;
final int translogMaxThreads;
final int shardWriteCoreThreads;
final int shardWriteMaxThreads;
final int clusterStateReadWriteCoreThreads;
final int clusterStateReadWriteMaxThreads;
final int getVirtualBatchedCompoundCommitChunkCoreThreads;
final int getVirtualBatchedCompoundCommitChunkMaxThreads;
final int fillVirtualBatchedCompoundCommitCacheCoreThreads;
final int fillVirtualBatchedCompoundCommitCacheMaxThreads;
final int prewarmMaxThreads;
final int uploadPrewarmCoreThreads;
final int uploadPrewarmMaxThreads;

if (hasIndexRole) {
shardReadMaxThreads = Math.min(processors * 4, 10);
translogCoreThreads = 2;
translogMaxThreads = Math.min(processors * 2, 8);
shardWriteCoreThreads = 2;
shardWriteMaxThreads = Math.min(processors * 4, 10);
clusterStateReadWriteCoreThreads = 2;
clusterStateReadWriteMaxThreads = 4;
getVirtualBatchedCompoundCommitChunkCoreThreads = 1;
getVirtualBatchedCompoundCommitChunkMaxThreads = Math.min(processors, 4);
fillVirtualBatchedCompoundCommitCacheCoreThreads = 0;
fillVirtualBatchedCompoundCommitCacheMaxThreads = 1;
prewarmMaxThreads = Math.min(processors * 2, 32);
// These threads are used for prewarming the shared blob cache on upload, and are separate from the prewarm thread pool
// in order to avoid any deadlocks between the two (e.g., when two fillgaps compete). Since they are used to prewarm on upload,
// we use the same amount of max threads as the shard write pool.
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
// threads around to reduce churn and re-use the existing buffers more
uploadPrewarmMaxThreads = Math.min(processors * 4, 10);
uploadPrewarmCoreThreads = uploadPrewarmMaxThreads / 2;
} else {
shardReadMaxThreads = Math.min(processors * 4, 28);
translogCoreThreads = 0;
translogMaxThreads = 1;
shardWriteCoreThreads = 0;
shardWriteMaxThreads = 1;
clusterStateReadWriteCoreThreads = 0;
clusterStateReadWriteMaxThreads = 1;
getVirtualBatchedCompoundCommitChunkCoreThreads = 0;
getVirtualBatchedCompoundCommitChunkMaxThreads = 1;
prewarmMaxThreads = Math.min(processors * 4, 32);
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
// threads around to reduce churn and re-use the existing buffers more
fillVirtualBatchedCompoundCommitCacheCoreThreads = Math.max(processors / 2, 2);
fillVirtualBatchedCompoundCommitCacheMaxThreads = Math.max(processors, 2);
uploadPrewarmCoreThreads = 0;
uploadPrewarmMaxThreads = 1;
}

return new ExecutorBuilder<?>[] {
new ScalingExecutorBuilder(
SHARD_READ_THREAD_POOL,
4,
shardReadMaxThreads,
TimeValue.timeValueMinutes(5),
true,
SHARD_READ_THREAD_POOL_SETTING,
EsExecutors.TaskTrackingConfig.builder().trackOngoingTasks().trackExecutionTime(0.3).build()
),
new ScalingExecutorBuilder(
TRANSLOG_THREAD_POOL,
translogCoreThreads,
translogMaxThreads,
TimeValue.timeValueMinutes(5),
true,
TRANSLOG_THREAD_POOL_SETTING
),
new ScalingExecutorBuilder(
SHARD_WRITE_THREAD_POOL,
shardWriteCoreThreads,
shardWriteMaxThreads,
TimeValue.timeValueMinutes(5),
true,
SHARD_WRITE_THREAD_POOL_SETTING
),
new ScalingExecutorBuilder(
CLUSTER_STATE_READ_WRITE_THREAD_POOL,
clusterStateReadWriteCoreThreads,
clusterStateReadWriteMaxThreads,
TimeValue.timeValueMinutes(5),
true,
CLUSTER_STATE_READ_WRITE_THREAD_POOL_SETTING
),
new ScalingExecutorBuilder(
GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL,
getVirtualBatchedCompoundCommitChunkCoreThreads,
getVirtualBatchedCompoundCommitChunkMaxThreads,
TimeValue.timeValueMinutes(5),
true,
GET_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING
),
new ScalingExecutorBuilder(
FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CACHE_THREAD_POOL,
fillVirtualBatchedCompoundCommitCacheCoreThreads,
fillVirtualBatchedCompoundCommitCacheMaxThreads,
TimeValue.timeValueMinutes(5),
true,
FILL_VIRTUAL_BATCHED_COMPOUND_COMMIT_CHUNK_THREAD_POOL_SETTING
),
new ScalingExecutorBuilder(
PREWARM_THREAD_POOL,
// these threads use a sizeable thread-local direct buffer which might take a while to GC, so we prefer to keep some idle
// threads around to reduce churn and re-use the existing buffers more
prewarmMaxThreads / 2,
prewarmMaxThreads,
TimeValue.timeValueMinutes(5),
true,
PREWARM_THREAD_POOL_SETTING
),
new ScalingExecutorBuilder(
UPLOAD_PREWARM_THREAD_POOL,
uploadPrewarmCoreThreads,
uploadPrewarmMaxThreads,
TimeValue.timeValueMinutes(5),
true,
UPLOAD_PREWARM_THREAD_POOL_SETTING
) };
}

@Override
public List<Setting<?>> getSettings() {
Expand Down Expand Up @@ -106,6 +273,7 @@ public StatelessPlugin(Settings settings) {
);
}
}
hasIndexRole = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.INDEX_ROLE);
}

@Override
Expand Down Expand Up @@ -152,4 +320,14 @@ public void close() throws IOException {
public boolean isEnabled() {
return enabled;
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (enabled) {
return List.of(statelessExecutorBuilders(settings, hasIndexRole));
} else {
return super.getExecutorBuilders(settings);
}
}

}
Loading