-
Notifications
You must be signed in to change notification settings - Fork 211
feat: Adds tier attribute to stream processor resource and datasource #3969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e29e4f4
64f909b
cea1c8f
ca47664
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| ```release-note:enhancement | ||
| resource/mongodbatlas_stream_processor: Adds the `tier` attribute | ||
| ``` | ||
|
|
||
| ```release-note:enhancement | ||
| data-source/mongodbatlas_stream_processor: Adds the `tier` attribute | ||
| ``` | ||
|
|
||
| ```release-note:enhancement | ||
| data-source/mongodbatlas_stream_processors: Adds the `tier` attribute | ||
| ``` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -107,13 +107,12 @@ func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateReque | |
| } | ||
|
|
||
| if needsStarting { | ||
| _, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx, | ||
| &admin.StartStreamProcessorApiParams{ | ||
| GroupId: projectID, | ||
| TenantName: workspaceOrInstanceName, | ||
| ProcessorName: processorName, | ||
| }, | ||
| ).Execute() | ||
| startWithOptions := &admin.StreamsStartStreamProcessorWith{} | ||
| if plan.Tier.ValueString() != "" { | ||
| startWithOptions.SetTier(plan.Tier.ValueString()) | ||
| } | ||
|
|
||
| _, err := connV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Create function is getting very long, consider extracting some parts into functions
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll create a follow-up ticket to explore this. I believe we have an existing action item on our end to go through this api flow but I'll reply back here |
||
| if err != nil { | ||
| resp.Diagnostics.AddError(errorCreateStart, err.Error()) | ||
| return | ||
|
|
@@ -237,13 +236,12 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque | |
|
|
||
| // start the stream processor if the desired state is started | ||
| if plannedState == StartedState { | ||
| _, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWithParams(ctx, | ||
| &admin.StartStreamProcessorApiParams{ | ||
| GroupId: projectID, | ||
| TenantName: workspaceOrInstanceName, | ||
| ProcessorName: processorName, | ||
| }, | ||
| ).Execute() | ||
| startWithOptions := &admin.StreamsStartStreamProcessorWith{} | ||
| if plan.Tier.ValueString() != "" { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like you're only changing Create, can the attribute be Updated? If it can't, it should have the customplanmodifier.CreateOnly() plan modifier. With the current implementation, I think that if customer tries to change the tier value, they will get an "inconsistency error" because the updated value won't match the initial value
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If it can, would also consider how it behaves if a stream processor is configured as started vs stopped when modifying
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The attribute can be updated. I had a couple of checks that I updated and added tests for the same. I believe the logic is complete for Create and Update now |
||
| startWithOptions.SetTier(plan.Tier.ValueString()) | ||
| } | ||
|
|
||
| _, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute() | ||
| if err != nil { | ||
| resp.Diagnostics.AddError("Error starting stream processor", err.Error()) | ||
| return | ||
|
|
@@ -257,6 +255,15 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque | |
| } | ||
| } | ||
|
|
||
| // Get the current state if the processor was not restarted | ||
| if streamProcessorResp == nil { | ||
| streamProcessorResp, _, err = r.Client.AtlasV2.StreamsApi.GetStreamProcessorWithParams(ctx, requestParams).Execute() | ||
| if err != nil { | ||
| resp.Diagnostics.AddError("Error reading updated stream processor", err.Error()) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| instanceName := plan.InstanceName.ValueString() | ||
| workspaceName := plan.WorkspaceName.ValueString() | ||
| newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, workspaceName, streamProcessorResp, &plan.Timeouts, &plan.DeleteOnCreateTimeout) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,6 +95,14 @@ func ResourceSchema(ctx context.Context) schema.Schema { | |
| Computed: true, | ||
| MarkdownDescription: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.", | ||
| }, | ||
| "tier": schema.StringAttribute{ | ||
| Optional: true, | ||
| Computed: true, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it Computed, because Atlas will return the value if not provided by client?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that is correct. Today each stream processor has a tier set even without user input. And we have made changes to return the tier. But now the client can provide the tier to override that |
||
| MarkdownDescription: "Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.", | ||
| PlanModifiers: []planmodifier.String{ | ||
| stringplanmodifier.UseStateForUnknown(), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. q: why are we using UseStateForUnknown?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is optional to provide, my thought was that if a user removes the For our apis, when a user stops/starts a stream processor, we always use the last tier that the stream processor was running on unless the user explicitly provides an option to override it i.e. we won't go back to the workspace default unless the user explicitly specifies to do so
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since the attribute is Optional + Computed this will be the default behaviour. When the attribute is removed no terraform plan is displayed, preserving the existing value in the state.
|
||
| }, | ||
| }, | ||
| "timeouts": timeouts.Attributes(ctx, timeouts.Opts{ | ||
| Create: true, | ||
| }), | ||
|
|
@@ -120,6 +128,7 @@ type TFStreamProcessorRSModel struct { | |
| ProjectID types.String `tfsdk:"project_id"` | ||
| State types.String `tfsdk:"state"` | ||
| Stats types.String `tfsdk:"stats"` | ||
| Tier types.String `tfsdk:"tier"` | ||
| Timeouts timeouts.Value `tfsdk:"timeouts"` | ||
| DeleteOnCreateTimeout types.Bool `tfsdk:"delete_on_create_timeout"` | ||
| } | ||
|
|
@@ -155,6 +164,7 @@ type TFStreamProcessorDSModel struct { | |
| ProjectID types.String `tfsdk:"project_id"` | ||
| State types.String `tfsdk:"state"` | ||
| Stats types.String `tfsdk:"stats"` | ||
| Tier types.String `tfsdk:"tier"` | ||
| } | ||
|
|
||
| type TFStreamProcessorsDSModel struct { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,67 @@ func TestAccStreamProcessor_basic(t *testing.T) { | |
| resource.ParallelTest(t, *basicTestCase(t)) | ||
| } | ||
|
|
||
| func TestAccStreamProcessor_withTier(t *testing.T) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| var ( | ||
| projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t) | ||
| randomSuffix = acctest.RandString(5) | ||
| processorName = "new-processor-tier" + randomSuffix | ||
| tier = "SP30" | ||
| ) | ||
|
|
||
| resource.ParallelTest(t, resource.TestCase{ | ||
| PreCheck: func() { acc.PreCheckBasic(t) }, | ||
| ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, | ||
| CheckDestroy: checkDestroyStreamProcessor, | ||
| Steps: []resource.TestStep{ | ||
| { | ||
| Config: configWithTier(t, projectID, workspaceName, processorName, tier), | ||
| Check: resource.ComposeAggregateTestCheckFunc( | ||
| resource.TestCheckResourceAttrSet(resourceName, "id"), | ||
| ), | ||
| }, | ||
| { | ||
| Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"), | ||
| Check: resource.ComposeAggregateTestCheckFunc( | ||
| resource.TestCheckResourceAttrSet(resourceName, "id"), | ||
| ), | ||
| }, | ||
| importStep(), | ||
| }}) | ||
| } | ||
|
|
||
| func TestAccStreamProcessor_tierUpdate(t *testing.T) { | ||
| var ( | ||
| projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t) | ||
| randomSuffix = acctest.RandString(5) | ||
| processorName = "tier-update-processor" + randomSuffix | ||
| ) | ||
|
|
||
| resource.ParallelTest(t, resource.TestCase{ | ||
| PreCheck: func() { acc.PreCheckBasic(t) }, | ||
| ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, | ||
| CheckDestroy: checkDestroyStreamProcessor, | ||
| Steps: []resource.TestStep{ | ||
| { | ||
| Config: configWithTier(t, projectID, workspaceName, processorName, "SP30"), | ||
| Check: resource.ComposeAggregateTestCheckFunc( | ||
| resource.TestCheckResourceAttrSet(resourceName, "id"), | ||
| resource.TestCheckResourceAttrSet(resourceName, "stats"), | ||
| ), | ||
| }, | ||
| { | ||
| // Update the stream processor tier to SP50 | ||
| Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"), | ||
| Check: resource.ComposeAggregateTestCheckFunc( | ||
| resource.TestCheckResourceAttrSet(resourceName, "id"), | ||
| resource.TestCheckResourceAttrSet(resourceName, "stats"), | ||
| ), | ||
| }, | ||
| importStep(), | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| func basicTestCase(t *testing.T) *resource.TestCase { | ||
| t.Helper() | ||
| var ( | ||
|
|
@@ -673,6 +734,43 @@ func config(t *testing.T, projectID, workspaceName, processorName, state, nameSu | |
| `, projectID, workspaceName, processorName, pipeline, stateConfig, optionsStr, dependsOnStr, timeoutConfig, deleteOnCreateTimeoutConfig) + otherConfig | ||
| } | ||
|
|
||
| func configWithTier(t *testing.T, projectID, workspaceName, processorName, tier string) string { | ||
| t.Helper() | ||
|
|
||
| return fmt.Sprintf(` | ||
| data "mongodbatlas_stream_connection" "sample_stream_solar" { | ||
| project_id = %[1]q | ||
| workspace_name = %[2]q | ||
| connection_name = "sample_stream_solar" | ||
| } | ||
|
|
||
| resource "mongodbatlas_stream_processor" "processor" { | ||
| project_id = %[1]q | ||
| workspace_name = %[2]q | ||
| processor_name = %[3]q | ||
| pipeline = jsonencode([ | ||
| { "$source" = { "connectionName" = data.mongodbatlas_stream_connection.sample_stream_solar.connection_name } }, | ||
| { "$emit" = { "connectionName" = "__testLog" } } | ||
| ]) | ||
| state = "STARTED" | ||
| tier = %[4]q | ||
| } | ||
|
|
||
| data "mongodbatlas_stream_processor" "test" { | ||
| project_id = %[1]q | ||
| workspace_name = %[2]q | ||
| processor_name = %[3]q | ||
| depends_on = [mongodbatlas_stream_processor.processor] | ||
| } | ||
|
|
||
| data "mongodbatlas_stream_processors" "test" { | ||
| project_id = %[1]q | ||
| workspace_name = %[2]q | ||
| depends_on = [mongodbatlas_stream_processor.processor] | ||
| } | ||
| `, projectID, workspaceName, processorName, tier) | ||
| } | ||
|
|
||
| func configMigration(t *testing.T, projectID, instanceName, processorName, state, nameSuffix string, src, dest connectionConfig, timeoutConfig string, deleteOnCreateTimeout *bool) string { | ||
| t.Helper() | ||
| stateConfig := "" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.