Skip to content

Commit 2984e20

Browse files
authored
Fix Postgres CDC issues (table mappings, error handling, state, CDC scaling, etc) (#415)
1 parent c942da6 commit 2984e20

File tree

12 files changed

+2020
-119
lines changed

12 files changed

+2020
-119
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/hashicorp/terraform-plugin-docs v0.24.0
1010
github.com/hashicorp/terraform-plugin-framework v1.16.1
1111
github.com/hashicorp/terraform-plugin-framework-validators v0.19.0
12+
github.com/hashicorp/terraform-plugin-go v0.29.0
1213
github.com/hashicorp/terraform-plugin-log v0.10.0
1314
github.com/stretchr/testify v1.10.0
1415
k8s.io/apimachinery v0.34.1
@@ -45,7 +46,6 @@ require (
4546
github.com/hashicorp/hc-install v0.9.2 // indirect
4647
github.com/hashicorp/terraform-exec v0.24.0 // indirect
4748
github.com/hashicorp/terraform-json v0.27.2 // indirect
48-
github.com/hashicorp/terraform-plugin-go v0.29.0 // indirect
4949
github.com/hashicorp/terraform-registry-address v0.4.0 // indirect
5050
github.com/hashicorp/terraform-svchost v0.1.1 // indirect
5151
github.com/hashicorp/yamux v0.1.2 // indirect

pkg/internal/api/clickpipe.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,3 +402,79 @@ func (c *ClientImpl) UpdateClickPipeSettings(ctx context.Context, serviceId stri
402402

403403
return settingsResponse.Result, nil
404404
}
405+
406+
type ClickPipeCdcScaling struct {
407+
ReplicaCpuMillicores int64 `json:"replicaCpuMillicores"`
408+
ReplicaMemoryGb float64 `json:"replicaMemoryGb"`
409+
}
410+
411+
type ClickPipeCdcScalingRequest struct {
412+
ReplicaCpuMillicores int64 `json:"replicaCpuMillicores"`
413+
ReplicaMemoryGb float64 `json:"replicaMemoryGb"`
414+
}
415+
416+
func (c *ClientImpl) GetClickPipeCdcScaling(ctx context.Context, serviceId string) (*ClickPipeCdcScaling, error) {
417+
req, err := http.NewRequest(http.MethodGet, c.getServicePath(serviceId, "/clickpipesCdcScaling"), nil)
418+
if err != nil {
419+
return nil, err
420+
}
421+
body, err := c.doRequest(ctx, req)
422+
if err != nil {
423+
return nil, err
424+
}
425+
426+
scalingResponse := ResponseWithResult[ClickPipeCdcScaling]{}
427+
if err := json.Unmarshal(body, &scalingResponse); err != nil {
428+
return nil, fmt.Errorf("failed to unmarshal CDC scaling: %w", err)
429+
}
430+
431+
return &scalingResponse.Result, nil
432+
}
433+
434+
func (c *ClientImpl) UpdateClickPipeCdcScaling(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) (*ClickPipeCdcScaling, error) {
435+
var payload bytes.Buffer
436+
if err := json.NewEncoder(&payload).Encode(request); err != nil {
437+
return nil, fmt.Errorf("failed to encode CDC scaling: %w", err)
438+
}
439+
440+
req, err := http.NewRequest(http.MethodPatch, c.getServicePath(serviceId, "/clickpipesCdcScaling"), &payload)
441+
if err != nil {
442+
return nil, err
443+
}
444+
445+
body, err := c.doRequest(ctx, req)
446+
if err != nil {
447+
return nil, err
448+
}
449+
450+
scalingResponse := ResponseWithResult[ClickPipeCdcScaling]{}
451+
if err := json.Unmarshal(body, &scalingResponse); err != nil {
452+
return nil, fmt.Errorf("failed to unmarshal CDC scaling: %w", err)
453+
}
454+
455+
return &scalingResponse.Result, nil
456+
}
457+
458+
func (c *ClientImpl) WaitForClickPipeCdcScaling(ctx context.Context, serviceId string, expectedCpuMillicores int64, expectedMemoryGb float64, maxElapsedTime time.Duration) (scaling *ClickPipeCdcScaling, err error) {
459+
checkScaling := func() error {
460+
scaling, err = c.GetClickPipeCdcScaling(ctx, serviceId)
461+
if err != nil {
462+
return err
463+
}
464+
465+
// Check if the scaling values match the expected values
466+
if scaling.ReplicaCpuMillicores == expectedCpuMillicores && scaling.ReplicaMemoryGb == expectedMemoryGb {
467+
return nil
468+
}
469+
470+
return fmt.Errorf("CDC scaling not yet applied: current cpu=%d (expected %d), memory=%.1f (expected %.1f)",
471+
scaling.ReplicaCpuMillicores, expectedCpuMillicores, scaling.ReplicaMemoryGb, expectedMemoryGb)
472+
}
473+
474+
if maxElapsedTime < 5*time.Second {
475+
maxElapsedTime = 5 * time.Second
476+
}
477+
478+
err = backoff.Retry(checkScaling, backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(maxElapsedTime), backoff.WithMaxInterval(maxElapsedTime/5)))
479+
return
480+
}

pkg/internal/api/clickpipe_models.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,12 @@ type ClickPipeKinesisSource struct {
144144
}
145145

146146
type ClickPipePostgresSource struct {
147-
Host string `json:"host"`
148-
Port int `json:"port"`
149-
Database string `json:"database"`
147+
Host string `json:"host,omitempty"`
148+
Port int `json:"port,omitempty"`
149+
Database string `json:"database,omitempty"`
150150
Credentials *ClickPipeSourceCredentials `json:"credentials,omitempty"`
151-
Settings ClickPipePostgresSettings `json:"settings"`
152-
Mappings []ClickPipePostgresTableMapping `json:"tableMappings"`
151+
Settings *ClickPipePostgresSettings `json:"settings,omitempty"`
152+
Mappings []ClickPipePostgresTableMapping `json:"tableMappings,omitempty"`
153153
TableMappingsToRemove []ClickPipePostgresTableMapping `json:"tableMappingsToRemove,omitempty"`
154154
TableMappingsToAdd []ClickPipePostgresTableMapping `json:"tableMappingsToAdd,omitempty"`
155155
}
@@ -158,7 +158,7 @@ type ClickPipePostgresSettings struct {
158158
SyncIntervalSeconds *int `json:"syncIntervalSeconds,omitempty"`
159159
PullBatchSize *int `json:"pullBatchSize,omitempty"`
160160
PublicationName *string `json:"publicationName,omitempty"`
161-
ReplicationMode string `json:"replicationMode"`
161+
ReplicationMode string `json:"replicationMode,omitempty"`
162162
ReplicationSlotName *string `json:"replicationSlotName,omitempty"`
163163
AllowNullableColumns *bool `json:"allowNullableColumns,omitempty"`
164164
InitialLoadParallelism *int `json:"initialLoadParallelism,omitempty"`

0 commit comments

Comments
 (0)