Skip to content

Commit e29e4f4

Browse files
committed
feat: adds tier attribute to stream processor resource and datasource
1 parent e8bb567 commit e29e4f4

File tree

7 files changed

+100
-15
lines changed

7 files changed

+100
-15
lines changed

.changelog/todo.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
```release-note:enhancement
2+
resource/mongodbatlas_stream_processor: Adds the `tier` attribute
3+
```
4+
5+
```release-note:enhancement
6+
data-source/mongodbatlas_stream_processor: Adds the `tier` attribute
7+
```

docs/data-sources/stream_processor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ output "stream_processors_results" {
140140

141141
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
142142
- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.
143+
- `tier` (String) Selected tier for the Stream Processor. Configures Memory / VCPU allowances.
143144

144145
<a id="nestedatt--options"></a>
145146
### Nested Schema for `options`

docs/resources/stream_processor.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
7070
{ "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
7171
])
7272
state = "STARTED"
73+
tier = "SP30"
7374
}
7475
7576
resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
@@ -141,9 +142,10 @@ output "stream_processors_results" {
141142
- `delete_on_create_timeout` (Boolean) Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to `true` and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to `false`, the timeout will not trigger resource deletion. If you suspect a transient error when the value is `true`, wait before retrying to allow resource deletion to finish. Default is `true`.
142143
- `instance_name` (String, Deprecated) Label that identifies the stream processing workspace.
143144
- `options` (Attributes) Optional configuration for the stream processor. (see [below for nested schema](#nestedatt--options))
144-
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.
145+
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.
145146

146147
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
148+
- `tier` (String) Selected tier for the Stream Processor. Configures Memory / VCPU allowances.
147149
- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts))
148150
- `workspace_name` (String) Label that identifies the stream processing workspace.
149151

internal/service/streamprocessor/model.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName, w
114114
ProjectID: types.StringPointerValue(&projectID),
115115
State: types.StringPointerValue(&apiResp.State),
116116
Stats: statsTF,
117+
Tier: types.StringPointerValue(apiResp.Tier),
117118
}
118119

119120
if workspaceName != "" {
@@ -158,6 +159,7 @@ func NewTFStreamprocessorDSModel(ctx context.Context, projectID, instanceName, w
158159
ProjectID: types.StringPointerValue(&projectID),
159160
State: types.StringPointerValue(&apiResp.State),
160161
Stats: statsTF,
162+
Tier: types.StringPointerValue(apiResp.Tier),
161163
}
162164

163165
if workspaceName != "" {

internal/service/streamprocessor/resource.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,12 @@ func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateReque
107107
}
108108

109109
if needsStarting {
110-
_, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx,
111-
&admin.StartStreamProcessorApiParams{
112-
GroupId: projectID,
113-
TenantName: workspaceOrInstanceName,
114-
ProcessorName: processorName,
115-
},
116-
).Execute()
110+
startWithOptions := &admin.StreamsStartStreamProcessorWith{}
111+
if plan.Tier.ValueString() != "" {
112+
startWithOptions.SetTier(plan.Tier.ValueString())
113+
}
114+
115+
_, err := connV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute()
117116
if err != nil {
118117
resp.Diagnostics.AddError(errorCreateStart, err.Error())
119118
return
@@ -237,13 +236,12 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque
237236

238237
// start the stream processor if the desired state is started
239238
if plannedState == StartedState {
240-
_, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWithParams(ctx,
241-
&admin.StartStreamProcessorApiParams{
242-
GroupId: projectID,
243-
TenantName: workspaceOrInstanceName,
244-
ProcessorName: processorName,
245-
},
246-
).Execute()
239+
startWithOptions := &admin.StreamsStartStreamProcessorWith{}
240+
if plan.Tier.ValueString() != "" {
241+
startWithOptions.SetTier(plan.Tier.ValueString())
242+
}
243+
244+
_, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWith(ctx, projectID, workspaceOrInstanceName, processorName, startWithOptions).Execute()
247245
if err != nil {
248246
resp.Diagnostics.AddError("Error starting stream processor", err.Error())
249247
return

internal/service/streamprocessor/resource_schema.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ func ResourceSchema(ctx context.Context) schema.Schema {
9595
Computed: true,
9696
MarkdownDescription: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.",
9797
},
98+
"tier": schema.StringAttribute{
99+
Optional: true,
100+
Computed: true,
101+
MarkdownDescription: "Selected tier for the Stream Processor. Configures Memory / VCPU allowances.",
102+
PlanModifiers: []planmodifier.String{
103+
stringplanmodifier.UseStateForUnknown(),
104+
},
105+
},
98106
"timeouts": timeouts.Attributes(ctx, timeouts.Opts{
99107
Create: true,
100108
}),
@@ -120,6 +128,7 @@ type TFStreamProcessorRSModel struct {
120128
ProjectID types.String `tfsdk:"project_id"`
121129
State types.String `tfsdk:"state"`
122130
Stats types.String `tfsdk:"stats"`
131+
Tier types.String `tfsdk:"tier"`
123132
Timeouts timeouts.Value `tfsdk:"timeouts"`
124133
DeleteOnCreateTimeout types.Bool `tfsdk:"delete_on_create_timeout"`
125134
}
@@ -155,6 +164,7 @@ type TFStreamProcessorDSModel struct {
155164
ProjectID types.String `tfsdk:"project_id"`
156165
State types.String `tfsdk:"state"`
157166
Stats types.String `tfsdk:"stats"`
167+
Tier types.String `tfsdk:"tier"`
158168
}
159169

160170
type TFStreamProcessorsDSModel struct {

internal/service/streamprocessor/resource_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,33 @@ func TestAccStreamProcessor_basic(t *testing.T) {
5353
resource.ParallelTest(t, *basicTestCase(t))
5454
}
5555

56+
func TestAccStreamProcessor_withTier(t *testing.T) {
57+
var (
58+
projectID, workspaceName = acc.ProjectIDExecutionWithStreamInstance(t)
59+
randomSuffix = acctest.RandString(5)
60+
processorName = "new-processor-tier" + randomSuffix
61+
tier = "SP30"
62+
)
63+
64+
resource.ParallelTest(t, resource.TestCase{
65+
PreCheck: func() { acc.PreCheckBasic(t) },
66+
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
67+
CheckDestroy: checkDestroyStreamProcessor,
68+
Steps: []resource.TestStep{
69+
{
70+
Config: configWithTier(t, projectID, workspaceName, processorName, tier),
71+
Check: resource.ComposeAggregateTestCheckFunc(
72+
resource.TestCheckResourceAttr(resourceName, "project_id", projectID),
73+
resource.TestCheckResourceAttr(resourceName, "workspace_name", workspaceName),
74+
resource.TestCheckResourceAttr(resourceName, "processor_name", processorName),
75+
resource.TestCheckResourceAttr(resourceName, "state", streamprocessor.StartedState),
76+
resource.TestCheckResourceAttr(resourceName, "tier", tier),
77+
),
78+
},
79+
importStep(),
80+
}})
81+
}
82+
5683
func basicTestCase(t *testing.T) *resource.TestCase {
5784
t.Helper()
5885
var (
@@ -673,6 +700,44 @@ func config(t *testing.T, projectID, workspaceName, processorName, state, nameSu
673700
`, projectID, workspaceName, processorName, pipeline, stateConfig, optionsStr, dependsOnStr, timeoutConfig, deleteOnCreateTimeoutConfig) + otherConfig
674701
}
675702

703+
func configWithTier(t *testing.T, projectID, workspaceName, processorName, tier string) string {
704+
t.Helper()
705+
706+
return fmt.Sprintf(`
707+
resource "mongodbatlas_stream_connection" "sample_stream_solar" {
708+
project_id = %[1]q
709+
workspace_name = %[2]q
710+
connection_name = "sample_stream_solar"
711+
type = "Sample"
712+
}
713+
714+
resource "mongodbatlas_stream_processor" "processor" {
715+
project_id = %[1]q
716+
workspace_name = %[2]q
717+
processor_name = %[3]q
718+
pipeline = jsonencode([
719+
{ "$source" = { "connectionName" = mongodbatlas_stream_connection.sample_stream_solar.connection_name } },
720+
{ "$emit" = { "connectionName" = "__testLog" } }
721+
])
722+
state = "STARTED"
723+
tier = %[4]q
724+
}
725+
726+
data "mongodbatlas_stream_processor" "test" {
727+
project_id = %[1]q
728+
workspace_name = %[2]q
729+
processor_name = %[3]q
730+
depends_on = [mongodbatlas_stream_processor.processor]
731+
}
732+
733+
data "mongodbatlas_stream_processors" "test" {
734+
project_id = %[1]q
735+
workspace_name = %[2]q
736+
depends_on = [mongodbatlas_stream_processor.processor]
737+
}
738+
`, projectID, workspaceName, processorName, tier)
739+
}
740+
676741
func configMigration(t *testing.T, projectID, instanceName, processorName, state, nameSuffix string, src, dest connectionConfig, timeoutConfig string, deleteOnCreateTimeout *bool) string {
677742
t.Helper()
678743
stateConfig := ""

0 commit comments

Comments
 (0)