diff --git a/.changelog/3969.txt b/.changelog/3969.txt new file mode 100644 index 0000000000..8abef0fe1c --- /dev/null +++ b/.changelog/3969.txt @@ -0,0 +1,11 @@ +```release-note:enhancement +resource/mongodbatlas_stream_processor: Adds the `tier` attribute +``` + +```release-note:enhancement +data-source/mongodbatlas_stream_processor: Adds the `tier` attribute +``` + +```release-note:enhancement +data-source/mongodbatlas_stream_processors: Adds the `tier` attribute +``` diff --git a/docs/data-sources/stream_processor.md b/docs/data-sources/stream_processor.md index 7df9dfc9af..84d1e19aa5 100644 --- a/docs/data-sources/stream_processor.md +++ b/docs/data-sources/stream_processor.md @@ -64,6 +64,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } } ]) state = "STARTED" + tier = "SP30" } resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" { @@ -140,6 +141,7 @@ output "stream_processors_results" { **NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion. - `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information. +- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50. ### Nested Schema for `options` diff --git a/docs/data-sources/stream_processors.md b/docs/data-sources/stream_processors.md index 88c7afb24d..7cf8b76368 100644 --- a/docs/data-sources/stream_processors.md +++ b/docs/data-sources/stream_processors.md @@ -64,6 +64,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } } ]) state = "STARTED" + tier = "SP30" } resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" { @@ -155,6 +156,7 @@ Read-Only: **NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion. - `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information. +- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50. - `workspace_name` (String) Label that identifies the stream processing workspace. diff --git a/docs/resources/stream_processor.md b/docs/resources/stream_processor.md index 45cbb8cfcd..6395083eee 100644 --- a/docs/resources/stream_processor.md +++ b/docs/resources/stream_processor.md @@ -70,6 +70,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } } ]) state = "STARTED" + tier = "SP30" } resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" { @@ -144,6 +145,7 @@ output "stream_processors_results" { - `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state. **NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion. +- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50. - `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts)) - `workspace_name` (String) Label that identifies the stream processing workspace. diff --git a/examples/mongodbatlas_stream_processor/main.tf b/examples/mongodbatlas_stream_processor/main.tf index 97536848e6..37537cdb19 100644 --- a/examples/mongodbatlas_stream_processor/main.tf +++ b/examples/mongodbatlas_stream_processor/main.tf @@ -54,6 +54,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } } ]) state = "STARTED" + tier = "SP30" } resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" { diff --git a/internal/service/streamprocessor/model.go b/internal/service/streamprocessor/model.go index 009e6f6b4e..6cd05bd361 100644 --- a/internal/service/streamprocessor/model.go +++ b/internal/service/streamprocessor/model.go @@ -114,6 +114,7 @@ func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName, w ProjectID: types.StringPointerValue(&projectID), State: types.StringPointerValue(&apiResp.State), Stats: statsTF, + Tier: types.StringPointerValue(apiResp.Tier), } if workspaceName != "" { @@ -158,6 +159,7 @@ func NewTFStreamprocessorDSModel(ctx context.Context, projectID, instanceName, w ProjectID: types.StringPointerValue(&projectID), State: types.StringPointerValue(&apiResp.State), Stats: statsTF, + Tier: types.StringPointerValue(apiResp.Tier), } if workspaceName != "" { diff --git a/internal/service/streamprocessor/resource.go b/internal/service/streamprocessor/resource.go index eb42e305e2..6dca7db7b6 100644 --- a/internal/service/streamprocessor/resource.go +++ b/internal/service/streamprocessor/resource.go @@ -107,13 +107,12 @@ func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateReque } if needsStarting { - _, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx, - &admin.StartStreamProcessorApiParams{ - GroupId: projectID, - TenantName: workspaceOrInstanceName, - ProcessorName: processorName, - }, - ).Execute() + startWithOptions := &admin.StreamsStartStreamProcessorWith{} + if plan.Tier.ValueString() != "" { + startWithOptions.SetTier(plan.Tier.ValueString()) + } + + _, err := connV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute() if err != nil { resp.Diagnostics.AddError(errorCreateStart, err.Error()) return @@ -237,13 +236,12 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque // start the stream processor if the desired state is started if plannedState == StartedState { - _, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWithParams(ctx, - &admin.StartStreamProcessorApiParams{ - GroupId: projectID, - TenantName: workspaceOrInstanceName, - ProcessorName: processorName, - }, - ).Execute() + startWithOptions := &admin.StreamsStartStreamProcessorWith{} + if plan.Tier.ValueString() != "" { + startWithOptions.SetTier(plan.Tier.ValueString()) + } + + _, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute() if err != nil { resp.Diagnostics.AddError("Error starting stream processor", err.Error()) return @@ -257,6 +255,15 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque } } + // Get the current state if the processor was not restarted + if streamProcessorResp == nil { + streamProcessorResp, _, err = r.Client.AtlasV2.StreamsApi.GetStreamProcessorWithParams(ctx, requestParams).Execute() + if err != nil { + resp.Diagnostics.AddError("Error reading updated stream processor", err.Error()) + return + } + } + instanceName := plan.InstanceName.ValueString() workspaceName := plan.WorkspaceName.ValueString() newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, workspaceName, streamProcessorResp, &plan.Timeouts, &plan.DeleteOnCreateTimeout) diff --git a/internal/service/streamprocessor/resource_schema.go b/internal/service/streamprocessor/resource_schema.go index c7955f610b..d7405a233c 100644 --- a/internal/service/streamprocessor/resource_schema.go +++ b/internal/service/streamprocessor/resource_schema.go @@ -95,6 +95,11 @@ func ResourceSchema(ctx context.Context) schema.Schema { Computed: true, MarkdownDescription: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.", }, + "tier": schema.StringAttribute{ + Optional: true, + Computed: true, + MarkdownDescription: "Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.", + }, "timeouts": timeouts.Attributes(ctx, timeouts.Opts{ Create: true, }), @@ -120,6 +125,7 @@ type TFStreamProcessorRSModel struct { ProjectID types.String `tfsdk:"project_id"` State types.String `tfsdk:"state"` Stats types.String `tfsdk:"stats"` + Tier types.String `tfsdk:"tier"` Timeouts timeouts.Value `tfsdk:"timeouts"` DeleteOnCreateTimeout types.Bool `tfsdk:"delete_on_create_timeout"` } @@ -155,6 +161,7 @@ type TFStreamProcessorDSModel struct { ProjectID types.String `tfsdk:"project_id"` State types.String `tfsdk:"state"` Stats types.String `tfsdk:"stats"` + Tier types.String `tfsdk:"tier"` } type TFStreamProcessorsDSModel struct { diff --git a/internal/service/streamprocessor/resource_test.go b/internal/service/streamprocessor/resource_test.go index dad4b8cc0a..c4a49f43b8 100644 --- a/internal/service/streamprocessor/resource_test.go +++ b/internal/service/streamprocessor/resource_test.go @@ -53,6 +53,67 @@ func TestAccStreamProcessor_basic(t *testing.T) { resource.ParallelTest(t, *basicTestCase(t)) } +func TestAccStreamProcessor_withTier(t *testing.T) { + var ( + projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t) + randomSuffix = acctest.RandString(5) + processorName = "new-processor-tier" + randomSuffix + tier = "SP30" + ) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: configWithTier(t, projectID, workspaceName, processorName, tier), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttrSet(resourceName, "id"), + ), + }, + { + Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttrSet(resourceName, "id"), + ), + }, + importStep(), + }}) +} + +func TestAccStreamProcessor_tierUpdate(t *testing.T) { + var ( + projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t) + randomSuffix = acctest.RandString(5) + processorName = "tier-update-processor" + randomSuffix + ) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: configWithTier(t, projectID, workspaceName, processorName, "SP30"), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttrSet(resourceName, "id"), + resource.TestCheckResourceAttrSet(resourceName, "stats"), + ), + }, + { + // Update the stream processor tier to SP50 + Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttrSet(resourceName, "id"), + resource.TestCheckResourceAttrSet(resourceName, "stats"), + ), + }, + importStep(), + }, + }) +} + func basicTestCase(t *testing.T) *resource.TestCase { t.Helper() var ( @@ -673,6 +734,43 @@ func config(t *testing.T, projectID, workspaceName, processorName, state, nameSu `, projectID, workspaceName, processorName, pipeline, stateConfig, optionsStr, dependsOnStr, timeoutConfig, deleteOnCreateTimeoutConfig) + otherConfig } +func configWithTier(t *testing.T, projectID, workspaceName, processorName, tier string) string { + t.Helper() + + return fmt.Sprintf(` + data "mongodbatlas_stream_connection" "sample_stream_solar" { + project_id = %[1]q + workspace_name = %[2]q + connection_name = "sample_stream_solar" + } + + resource "mongodbatlas_stream_processor" "processor" { + project_id = %[1]q + workspace_name = %[2]q + processor_name = %[3]q + pipeline = jsonencode([ + { "$source" = { "connectionName" = data.mongodbatlas_stream_connection.sample_stream_solar.connection_name } }, + { "$emit" = { "connectionName" = "__testLog" } } + ]) + state = "STARTED" + tier = %[4]q + } + + data "mongodbatlas_stream_processor" "test" { + project_id = %[1]q + workspace_name = %[2]q + processor_name = %[3]q + depends_on = [mongodbatlas_stream_processor.processor] + } + + data "mongodbatlas_stream_processors" "test" { + project_id = %[1]q + workspace_name = %[2]q + depends_on = [mongodbatlas_stream_processor.processor] + } + `, projectID, workspaceName, processorName, tier) +} + func configMigration(t *testing.T, projectID, instanceName, processorName, state, nameSuffix string, src, dest connectionConfig, timeoutConfig string, deleteOnCreateTimeout *bool) string { t.Helper() stateConfig := ""