Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/3969.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:enhancement
resource/mongodbatlas_stream_processor: Adds the `tier` attribute
```

```release-note:enhancement
data-source/mongodbatlas_stream_processor: Adds the `tier` attribute
```

```release-note:enhancement
data-source/mongodbatlas_stream_processors: Adds the `tier` attribute
```
2 changes: 2 additions & 0 deletions docs/data-sources/stream_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
tier = "SP30"
}
resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
Expand Down Expand Up @@ -140,6 +141,7 @@ output "stream_processors_results" {

**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.
- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.

<a id="nestedatt--options"></a>
### Nested Schema for `options`
Expand Down
2 changes: 2 additions & 0 deletions docs/data-sources/stream_processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
tier = "SP30"
}

resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
Expand Down Expand Up @@ -155,6 +156,7 @@ Read-Only:

**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.
- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.
- `workspace_name` (String) Label that identifies the stream processing workspace.

<a id="nestedatt--results--options"></a>
Expand Down
2 changes: 2 additions & 0 deletions docs/resources/stream_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
tier = "SP30"
}

resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
Expand Down Expand Up @@ -144,6 +145,7 @@ output "stream_processors_results" {
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
- `tier` (String) Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.
- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts))
- `workspace_name` (String) Label that identifies the stream processing workspace.

Expand Down
1 change: 1 addition & 0 deletions examples/mongodbatlas_stream_processor/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
])
state = "STARTED"
tier = "SP30"
}

resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
Expand Down
2 changes: 2 additions & 0 deletions internal/service/streamprocessor/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName, w
ProjectID: types.StringPointerValue(&projectID),
State: types.StringPointerValue(&apiResp.State),
Stats: statsTF,
Tier: types.StringPointerValue(apiResp.Tier),
}

if workspaceName != "" {
Expand Down Expand Up @@ -158,6 +159,7 @@ func NewTFStreamprocessorDSModel(ctx context.Context, projectID, instanceName, w
ProjectID: types.StringPointerValue(&projectID),
State: types.StringPointerValue(&apiResp.State),
Stats: statsTF,
Tier: types.StringPointerValue(apiResp.Tier),
}

if workspaceName != "" {
Expand Down
35 changes: 21 additions & 14 deletions internal/service/streamprocessor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,12 @@ func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateReque
}

if needsStarting {
_, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx,
&admin.StartStreamProcessorApiParams{
GroupId: projectID,
TenantName: workspaceOrInstanceName,
ProcessorName: processorName,
},
).Execute()
startWithOptions := &admin.StreamsStartStreamProcessorWith{}
if plan.Tier.ValueString() != "" {
startWithOptions.SetTier(plan.Tier.ValueString())
}

_, err := connV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create function is getting very long, consider extracting some parts into functions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create a follow-up ticket to explore this. I believe we have an existing action item on our end to go through this api flow but I'll reply back here

if err != nil {
resp.Diagnostics.AddError(errorCreateStart, err.Error())
return
Expand Down Expand Up @@ -237,13 +236,12 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque

// start the stream processor if the desired state is started
if plannedState == StartedState {
_, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWithParams(ctx,
&admin.StartStreamProcessorApiParams{
GroupId: projectID,
TenantName: workspaceOrInstanceName,
ProcessorName: processorName,
},
).Execute()
startWithOptions := &admin.StreamsStartStreamProcessorWith{}
if plan.Tier.ValueString() != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're only changing Create, can the attribute be Updated?

If it can't, it should have the customplanmodifier.CreateOnly() plan modifier. With the current implementation, I think that if customer tries to change the tier value, they will get an "inconsistency error" because the updated value won't match the initial value

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the attribute be Updated?

If it can, would also consider how it behaves if a stream processor is configured as started vs stopped when modifying tier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The attribute can be updated. I had a couple of checks that I updated and added tests for the same. I believe the logic is complete for Create and Update now

startWithOptions.SetTier(plan.Tier.ValueString())
}

_, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute()
if err != nil {
resp.Diagnostics.AddError("Error starting stream processor", err.Error())
return
Expand All @@ -257,6 +255,15 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque
}
}

// Get the current state if the processor was not restarted
if streamProcessorResp == nil {
streamProcessorResp, _, err = r.Client.AtlasV2.StreamsApi.GetStreamProcessorWithParams(ctx, requestParams).Execute()
if err != nil {
resp.Diagnostics.AddError("Error reading updated stream processor", err.Error())
return
}
}

instanceName := plan.InstanceName.ValueString()
workspaceName := plan.WorkspaceName.ValueString()
newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, workspaceName, streamProcessorResp, &plan.Timeouts, &plan.DeleteOnCreateTimeout)
Expand Down
10 changes: 10 additions & 0 deletions internal/service/streamprocessor/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func ResourceSchema(ctx context.Context) schema.Schema {
Computed: true,
MarkdownDescription: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.",
},
"tier": schema.StringAttribute{
Optional: true,
Computed: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it Computed, because Atlas will return the value if not provided by client?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is correct. Today each stream processor has a tier set even without user input. And we have made changes to return the tier. But now the client can provide the tier to override that

MarkdownDescription: "Selected tier to start a stream processor on rather than defaulting to the workspace setting. Configures Memory / VCPU allowances. Valid options are SP2, SP5, SP10, SP30, and SP50.",
PlanModifiers: []planmodifier.String{
stringplanmodifier.UseStateForUnknown(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why are we using UseStateForUnknown?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is optional to provide, my thought was that if a user removes the tier from the config, we should not show a plan change. If I have unnecessarily used or missed UseStateForUnknown, I can remove it

For our apis, when a user stops/starts a stream processor, we always use the last tier that the stream processor was running on unless the user explicitly provides an option to override it i.e. we won't go back to the workspace default unless the user explicitly specifies to do so

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thought was that if a user removes the tier from the config, we should not show a plan change

Since the attribute is Optional + Computed this will be the default behaviour. When the attribute is removed no terraform plan is displayed, preserving the existing value in the state.

UseStateForUnknown is for a different scenario, primarily to reduce plan verbosity of known after apply, used only for computed attributes which value is guaranteed to not change when not defined. If this assumption is not held users can receive errors after apply operations produced inconsistent result after apply. If you believe we can hold this requirement we can keep it, otherwise would simple suggest to remove.

},
},
"timeouts": timeouts.Attributes(ctx, timeouts.Opts{
Create: true,
}),
Expand All @@ -120,6 +128,7 @@ type TFStreamProcessorRSModel struct {
ProjectID types.String `tfsdk:"project_id"`
State types.String `tfsdk:"state"`
Stats types.String `tfsdk:"stats"`
Tier types.String `tfsdk:"tier"`
Timeouts timeouts.Value `tfsdk:"timeouts"`
DeleteOnCreateTimeout types.Bool `tfsdk:"delete_on_create_timeout"`
}
Expand Down Expand Up @@ -155,6 +164,7 @@ type TFStreamProcessorDSModel struct {
ProjectID types.String `tfsdk:"project_id"`
State types.String `tfsdk:"state"`
Stats types.String `tfsdk:"stats"`
Tier types.String `tfsdk:"tier"`
}

type TFStreamProcessorsDSModel struct {
Expand Down
98 changes: 98 additions & 0 deletions internal/service/streamprocessor/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,67 @@ func TestAccStreamProcessor_basic(t *testing.T) {
resource.ParallelTest(t, *basicTestCase(t))
}

func TestAccStreamProcessor_withTier(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestAccStreamProcessor_withTier and TestAccStreamProcessor_tierUpdate seem to be doing the same steps right? Would leave only TestAccStreamProcessor_withTier.

var (
projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t)
randomSuffix = acctest.RandString(5)
processorName = "new-processor-tier" + randomSuffix
tier = "SP30"
)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.PreCheckBasic(t) },
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
CheckDestroy: checkDestroyStreamProcessor,
Steps: []resource.TestStep{
{
Config: configWithTier(t, projectID, workspaceName, processorName, tier),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrSet(resourceName, "id"),
),
},
{
Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrSet(resourceName, "id"),
),
},
importStep(),
}})
}

func TestAccStreamProcessor_tierUpdate(t *testing.T) {
var (
projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t)
randomSuffix = acctest.RandString(5)
processorName = "tier-update-processor" + randomSuffix
)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.PreCheckBasic(t) },
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
CheckDestroy: checkDestroyStreamProcessor,
Steps: []resource.TestStep{
{
Config: configWithTier(t, projectID, workspaceName, processorName, "SP30"),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrSet(resourceName, "id"),
resource.TestCheckResourceAttrSet(resourceName, "stats"),
),
},
{
// Update the stream processor tier to SP50
Config: configWithTier(t, projectID, workspaceName, processorName, "SP50"),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrSet(resourceName, "id"),
resource.TestCheckResourceAttrSet(resourceName, "stats"),
),
},
importStep(),
},
})
}

func basicTestCase(t *testing.T) *resource.TestCase {
t.Helper()
var (
Expand Down Expand Up @@ -673,6 +734,43 @@ func config(t *testing.T, projectID, workspaceName, processorName, state, nameSu
`, projectID, workspaceName, processorName, pipeline, stateConfig, optionsStr, dependsOnStr, timeoutConfig, deleteOnCreateTimeoutConfig) + otherConfig
}

func configWithTier(t *testing.T, projectID, workspaceName, processorName, tier string) string {
t.Helper()

return fmt.Sprintf(`
data "mongodbatlas_stream_connection" "sample_stream_solar" {
project_id = %[1]q
workspace_name = %[2]q
connection_name = "sample_stream_solar"
}

resource "mongodbatlas_stream_processor" "processor" {
project_id = %[1]q
workspace_name = %[2]q
processor_name = %[3]q
pipeline = jsonencode([
{ "$source" = { "connectionName" = data.mongodbatlas_stream_connection.sample_stream_solar.connection_name } },
{ "$emit" = { "connectionName" = "__testLog" } }
])
state = "STARTED"
tier = %[4]q
}

data "mongodbatlas_stream_processor" "test" {
project_id = %[1]q
workspace_name = %[2]q
processor_name = %[3]q
depends_on = [mongodbatlas_stream_processor.processor]
}

data "mongodbatlas_stream_processors" "test" {
project_id = %[1]q
workspace_name = %[2]q
depends_on = [mongodbatlas_stream_processor.processor]
}
`, projectID, workspaceName, processorName, tier)
}

func configMigration(t *testing.T, projectID, instanceName, processorName, state, nameSuffix string, src, dest connectionConfig, timeoutConfig string, deleteOnCreateTimeout *bool) string {
t.Helper()
stateConfig := ""
Expand Down
Loading