diff --git a/.changelog/3969.txt b/.changelog/3969.txt
new file mode 100644
index 0000000000..8abef0fe1c
--- /dev/null
+++ b/.changelog/3969.txt
@@ -0,0 +1,11 @@
+```release-note:enhancement
+resource/mongodbatlas_stream_processor: Adds the `tier` attribute
+```
+
+```release-note:enhancement
+data-source/mongodbatlas_stream_processor: Adds the `tier` attribute
+```
+
+```release-note:enhancement
+data-source/mongodbatlas_stream_processors: Adds the `tier` attribute
+```
diff --git a/docs/data-sources/stream_processor.md b/docs/data-sources/stream_processor.md
index 7df9dfc9af..84d1e19aa5 100644
--- a/docs/data-sources/stream_processor.md
+++ b/docs/data-sources/stream_processor.md
@@ -64,6 +64,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
+ tier = "SP30"
}
resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
@@ -140,6 +141,7 @@ output "stream_processors_results" {
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.
+- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.
### Nested Schema for `options`
diff --git a/docs/data-sources/stream_processors.md b/docs/data-sources/stream_processors.md
index 88c7afb24d..7cf8b76368 100644
--- a/docs/data-sources/stream_processors.md
+++ b/docs/data-sources/stream_processors.md
@@ -64,6 +64,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
+ tier = "SP30"
}
resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
@@ -155,6 +156,7 @@ Read-Only:
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.
+- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.
- `workspace_name` (String) Label that identifies the stream processing workspace.
diff --git a/docs/resources/stream_processor.md b/docs/resources/stream_processor.md
index 45cbb8cfcd..6395083eee 100644
--- a/docs/resources/stream_processor.md
+++ b/docs/resources/stream_processor.md
@@ -70,6 +70,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
+ tier = "SP30"
}
resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
@@ -144,6 +145,7 @@ output "stream_processors_results" {
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
+- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.
- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts))
- `workspace_name` (String) Label that identifies the stream processing workspace.
diff --git a/examples/mongodbatlas_stream_processor/main.tf b/examples/mongodbatlas_stream_processor/main.tf
index 97536848e6..37537cdb19 100644
--- a/examples/mongodbatlas_stream_processor/main.tf
+++ b/examples/mongodbatlas_stream_processor/main.tf
@@ -54,6 +54,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
+ tier = "SP30"
}
resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
diff --git a/internal/service/streamprocessor/model.go b/internal/service/streamprocessor/model.go
index 009e6f6b4e..6cd05bd361 100644
--- a/internal/service/streamprocessor/model.go
+++ b/internal/service/streamprocessor/model.go
@@ -114,6 +114,7 @@ func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName, w
ProjectID: types.StringPointerValue(&projectID),
State: types.StringPointerValue(&apiResp.State),
Stats: statsTF,
+ Tier: types.StringPointerValue(apiResp.Tier),
}
if workspaceName != "" {
@@ -158,6 +159,7 @@ func NewTFStreamprocessorDSModel(ctx context.Context, projectID, instanceName, w
ProjectID: types.StringPointerValue(&projectID),
State: types.StringPointerValue(&apiResp.State),
Stats: statsTF,
+ Tier: types.StringPointerValue(apiResp.Tier),
}
if workspaceName != "" {
diff --git a/internal/service/streamprocessor/resource.go b/internal/service/streamprocessor/resource.go
index eb42e305e2..6dca7db7b6 100644
--- a/internal/service/streamprocessor/resource.go
+++ b/internal/service/streamprocessor/resource.go
@@ -107,13 +107,12 @@ func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateReque
}
if needsStarting {
- _, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx,
- &admin.StartStreamProcessorApiParams{
- GroupId: projectID,
- TenantName: workspaceOrInstanceName,
- ProcessorName: processorName,
- },
- ).Execute()
+ startWithOptions := &admin.StreamsStartStreamProcessorWith{}
+ if plan.Tier.ValueString() != "" {
+ startWithOptions.SetTier(plan.Tier.ValueString())
+ }
+
+ _, err := connV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute()
if err != nil {
resp.Diagnostics.AddError(errorCreateStart, err.Error())
return
@@ -237,13 +236,12 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque
// start the stream processor if the desired state is started
if plannedState == StartedState {
- _, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWithParams(ctx,
- &admin.StartStreamProcessorApiParams{
- GroupId: projectID,
- TenantName: workspaceOrInstanceName,
- ProcessorName: processorName,
- },
- ).Execute()
+ startWithOptions := &admin.StreamsStartStreamProcessorWith{}
+ if plan.Tier.ValueString() != "" {
+ startWithOptions.SetTier(plan.Tier.ValueString())
+ }
+
+ _, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute()
if err != nil {
resp.Diagnostics.AddError("Error starting stream processor", err.Error())
return
@@ -257,6 +255,15 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque
}
}
+ // Get the current state if the processor was not restarted
+ if streamProcessorResp == nil {
+ streamProcessorResp, _, err = r.Client.AtlasV2.StreamsApi.GetStreamProcessorWithParams(ctx, requestParams).Execute()
+ if err != nil {
+ resp.Diagnostics.AddError("Error reading updated stream processor", err.Error())
+ return
+ }
+ }
+
instanceName := plan.InstanceName.ValueString()
workspaceName := plan.WorkspaceName.ValueString()
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, workspaceName, streamProcessorResp, &plan.Timeouts, &plan.DeleteOnCreateTimeout)
diff --git a/internal/service/streamprocessor/resource_schema.go b/internal/service/streamprocessor/resource_schema.go
index c7955f610b..d7405a233c 100644
--- a/internal/service/streamprocessor/resource_schema.go
+++ b/internal/service/streamprocessor/resource_schema.go
@@ -95,6 +95,11 @@ func ResourceSchema(ctx context.Context) schema.Schema {
Computed: true,
MarkdownDescription: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.",
},
+ "tier": schema.StringAttribute{
+ Optional: true,
+ Computed: true,
+ MarkdownDescription: "Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.",
+ },
"timeouts": timeouts.Attributes(ctx, timeouts.Opts{
Create: true,
}),
@@ -120,6 +125,7 @@ type TFStreamProcessorRSModel struct {
ProjectID types.String `tfsdk:"project_id"`
State types.String `tfsdk:"state"`
Stats types.String `tfsdk:"stats"`
+ Tier types.String `tfsdk:"tier"`
Timeouts timeouts.Value `tfsdk:"timeouts"`
DeleteOnCreateTimeout types.Bool `tfsdk:"delete_on_create_timeout"`
}
@@ -155,6 +161,7 @@ type TFStreamProcessorDSModel struct {
ProjectID types.String `tfsdk:"project_id"`
State types.String `tfsdk:"state"`
Stats types.String `tfsdk:"stats"`
+ Tier types.String `tfsdk:"tier"`
}
type TFStreamProcessorsDSModel struct {
diff --git a/internal/service/streamprocessor/resource_test.go b/internal/service/streamprocessor/resource_test.go
index dad4b8cc0a..c4a49f43b8 100644
--- a/internal/service/streamprocessor/resource_test.go
+++ b/internal/service/streamprocessor/resource_test.go
@@ -53,6 +53,67 @@ func TestAccStreamProcessor_basic(t *testing.T) {
resource.ParallelTest(t, *basicTestCase(t))
}
+func TestAccStreamProcessor_withTier(t *testing.T) {
+ var (
+ projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t)
+ randomSuffix = acctest.RandString(5)
+ processorName = "new-processor-tier" + randomSuffix
+ tier = "SP30"
+ )
+
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { acc.PreCheckBasic(t) },
+ ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
+ CheckDestroy: checkDestroyStreamProcessor,
+ Steps: []resource.TestStep{
+ {
+ Config: configWithTier(t, projectID, workspaceName, processorName, tier),
+ Check: resource.ComposeAggregateTestCheckFunc(
+ resource.TestCheckResourceAttrSet(resourceName, "id"),
+ ),
+ },
+ {
+ Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"),
+ Check: resource.ComposeAggregateTestCheckFunc(
+ resource.TestCheckResourceAttrSet(resourceName, "id"),
+ ),
+ },
+ importStep(),
+ }})
+}
+
+func TestAccStreamProcessor_tierUpdate(t *testing.T) {
+ var (
+ projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t)
+ randomSuffix = acctest.RandString(5)
+ processorName = "tier-update-processor" + randomSuffix
+ )
+
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { acc.PreCheckBasic(t) },
+ ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
+ CheckDestroy: checkDestroyStreamProcessor,
+ Steps: []resource.TestStep{
+ {
+ Config: configWithTier(t, projectID, workspaceName, processorName, "SP30"),
+ Check: resource.ComposeAggregateTestCheckFunc(
+ resource.TestCheckResourceAttrSet(resourceName, "id"),
+ resource.TestCheckResourceAttrSet(resourceName, "stats"),
+ ),
+ },
+ {
+ // Update the stream processor tier to SP50
+ Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"),
+ Check: resource.ComposeAggregateTestCheckFunc(
+ resource.TestCheckResourceAttrSet(resourceName, "id"),
+ resource.TestCheckResourceAttrSet(resourceName, "stats"),
+ ),
+ },
+ importStep(),
+ },
+ })
+}
+
func basicTestCase(t *testing.T) *resource.TestCase {
t.Helper()
var (
@@ -673,6 +734,43 @@ func config(t *testing.T, projectID, workspaceName, processorName, state, nameSu
`, projectID, workspaceName, processorName, pipeline, stateConfig, optionsStr, dependsOnStr, timeoutConfig, deleteOnCreateTimeoutConfig) + otherConfig
}
+func configWithTier(t *testing.T, projectID, workspaceName, processorName, tier string) string {
+ t.Helper()
+
+ return fmt.Sprintf(`
+ data "mongodbatlas_stream_connection" "sample_stream_solar" {
+ project_id = %[1]q
+ workspace_name = %[2]q
+ connection_name = "sample_stream_solar"
+ }
+
+ resource "mongodbatlas_stream_processor" "processor" {
+ project_id = %[1]q
+ workspace_name = %[2]q
+ processor_name = %[3]q
+ pipeline = jsonencode([
+ { "$source" = { "connectionName" = data.mongodbatlas_stream_connection.sample_stream_solar.connection_name } },
+ { "$emit" = { "connectionName" = "__testLog" } }
+ ])
+ state = "STARTED"
+ tier = %[4]q
+ }
+
+ data "mongodbatlas_stream_processor" "test" {
+ project_id = %[1]q
+ workspace_name = %[2]q
+ processor_name = %[3]q
+ depends_on = [mongodbatlas_stream_processor.processor]
+ }
+
+ data "mongodbatlas_stream_processors" "test" {
+ project_id = %[1]q
+ workspace_name = %[2]q
+ depends_on = [mongodbatlas_stream_processor.processor]
+ }
+ `, projectID, workspaceName, processorName, tier)
+}
+
func configMigration(t *testing.T, projectID, instanceName, processorName, state, nameSuffix string, src, dest connectionConfig, timeoutConfig string, deleteOnCreateTimeout *bool) string {
t.Helper()
stateConfig := ""