diff --git a/.gitignore b/.gitignore index da529d0f..71895fce 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,6 @@ dev/ .idea/ *.backup -**/variables.tfvars +**/variables*.tfvars /tmp diff --git a/examples/clickpipe/bigquery_snapshot/README.md b/examples/clickpipe/bigquery_snapshot/README.md new file mode 100644 index 00000000..5bb49ad0 --- /dev/null +++ b/examples/clickpipe/bigquery_snapshot/README.md @@ -0,0 +1,16 @@ +## ClickPipe BigQuery snapshot example + +This example demonstrates how to deploy a BigQuery snapshot ClickPipe using Terraform. + +It provisions all necessary GCP prerequisites, including: +- GCS staging bucket +- IAM service account with required permissions +- IAM service account key + +BigQuery dataset and table must already exist. + +## How to run + +- Rename `variables.tfvars.sample` to `variables.tfvars` and fill in all needed data. +- Run `terraform init` +- Run `terraform -var-file=variables.tfvars` diff --git a/examples/clickpipe/bigquery_snapshot/clickpipe.tf b/examples/clickpipe/bigquery_snapshot/clickpipe.tf new file mode 100644 index 00000000..684c8a03 --- /dev/null +++ b/examples/clickpipe/bigquery_snapshot/clickpipe.tf @@ -0,0 +1,45 @@ +resource "random_id" "clickpipes_suffix" { + byte_length = 4 +} + +locals { + snapshot_staging_path = "gs://${google_storage_bucket.clickpipes_staging_bucket.name}/${random_id.clickpipes_suffix.hex}/" +} + +resource "clickhouse_clickpipe" "bigquery_snapshot" { + name = "BigQuery Snapshot ClickPipe" + + service_id = var.service_id + + source = { + bigquery = { + snapshot_staging_path = local.snapshot_staging_path + + credentials = { + service_account_file = google_service_account_key.clickpipes_key.private_key + } + + settings = { + replication_mode = "snapshot" + } + + table_mappings = [for table_name in var.bigquery_table_names : { + source_dataset_name = var.bigquery_dataset_id + source_table = table_name + target_table = "${table_name}_${random_id.clickpipes_suffix.hex}" + }] + } + } + + destination = { + database = "default" + } +} + +output "clickpipe_id" { + value = clickhouse_clickpipe.bigquery_snapshot.id +} + +output "clickpipe_state" { + value = clickhouse_clickpipe.bigquery_snapshot.state +} diff --git a/examples/clickpipe/bigquery_snapshot/gcp.tf b/examples/clickpipe/bigquery_snapshot/gcp.tf new file mode 100644 index 00000000..d633603f --- /dev/null +++ b/examples/clickpipe/bigquery_snapshot/gcp.tf @@ -0,0 +1,107 @@ +resource "random_id" "suffix" { + byte_length = 4 +} + +locals { + dataset_name = split("/", data.google_bigquery_dataset.dataset.id)[length(split("/", data.google_bigquery_dataset.dataset.id)) - 1] + staging_bucket_name = "${var.gcp_project_id}-clickpipe-staging-${random_id.suffix.hex}" + sa_name = "clickpipe-bigquery-${random_id.suffix.hex}" + sa_display_name = "ClickPipe BigQuery Service Account" +} + +// Ensures the BigQuery dataset and tables exist +data "google_bigquery_dataset" "dataset" { + project = var.gcp_project_id + dataset_id = var.bigquery_dataset_id +} + +data "google_bigquery_table" "table" { + for_each = toset(var.bigquery_table_names) + + dataset_id = var.bigquery_dataset_id + table_id = each.value +} + +// This bucket is used by ClickPipe to stage data during BigQuery exports +resource "google_storage_bucket" "clickpipes_staging_bucket" { + name = local.staging_bucket_name + location = var.gcp_region + project = var.gcp_project_id + force_destroy = true // do not use in production + + uniform_bucket_level_access = true + + lifecycle_rule { + condition { + age = 1 + } + action { + type = "Delete" + } + } +} + +// Service account for ClickPipe to access BigQuery and GCS +resource "google_service_account" "clickpipes" { + project = var.gcp_project_id + account_id = local.sa_name + display_name = local.sa_display_name + description = "Service account for ClickPipe to access BigQuery and GCS" +} + +// Service account key for ClickPipe +resource "google_service_account_key" "clickpipes_key" { + service_account_id = google_service_account.clickpipes.name + public_key_type = "TYPE_X509_PEM_FILE" + private_key_type = "TYPE_GOOGLE_CREDENTIALS_FILE" +} + +// Allows to view BigQuery datasets and tables with dataset-level condition +resource "google_project_iam_member" "bigquery_data_viewer" { + project = var.gcp_project_id + role = "roles/bigquery.dataViewer" + member = "serviceAccount:${google_service_account.clickpipes.email}" + + condition { + title = "Restrict access to specific dataset" + description = "Allow access only to the designated BigQuery dataset" + expression = "resource.name.startsWith(\"projects/${var.gcp_project_id}/datasets/${local.dataset_name}\")" + } +} + +// This allows ClickPipes to run BigQuery export jobs +resource "google_project_iam_member" "bigquery_job_user" { + project = var.gcp_project_id + role = "roles/bigquery.jobUser" + member = "serviceAccount:${google_service_account.clickpipes.email}" +} + +// GCS Object Admin role with bucket-level condition +resource "google_project_iam_member" "storage_object_admin" { + project = var.gcp_project_id + role = "roles/storage.objectAdmin" + member = "serviceAccount:${google_service_account.clickpipes.email}" + + condition { + title = "Restrict access to staging bucket" + description = "Allow access only to the ClickPipe staging bucket" + expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")" + } +} + +// GCS Bucket Viewer role with bucket-level condition +resource "google_project_iam_member" "storage_bucket_viewer" { + project = var.gcp_project_id + role = "roles/storage.bucketViewer" + member = "serviceAccount:${google_service_account.clickpipes.email}" + + condition { + title = "Restrict access to staging bucket" + description = "Allow access only to the ClickPipe staging bucket" + expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")" + } +} + +output "clickpipes_bigquery_service_account_email" { + value = google_service_account.clickpipes.email +} diff --git a/examples/clickpipe/bigquery_snapshot/provider.tf b/examples/clickpipe/bigquery_snapshot/provider.tf new file mode 100644 index 00000000..9fb24ac0 --- /dev/null +++ b/examples/clickpipe/bigquery_snapshot/provider.tf @@ -0,0 +1,20 @@ +# This file is generated automatically please do not edit +terraform { + required_providers { + clickhouse = { + version = "3.8.1-alpha1" + source = "ClickHouse/clickhouse" + } + } +} + +provider "clickhouse" { + organization_id = var.organization_id + token_key = var.token_key + token_secret = var.token_secret +} + +provider "google" { + project = var.gcp_project_id + region = var.gcp_region +} diff --git a/examples/clickpipe/bigquery_snapshot/provider.tf.template.alpha b/examples/clickpipe/bigquery_snapshot/provider.tf.template.alpha new file mode 100644 index 00000000..27097ead --- /dev/null +++ b/examples/clickpipe/bigquery_snapshot/provider.tf.template.alpha @@ -0,0 +1,19 @@ +terraform { + required_providers { + clickhouse = { + version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}" + source = "ClickHouse/clickhouse" + } + } +} + +provider "clickhouse" { + organization_id = var.organization_id + token_key = var.token_key + token_secret = var.token_secret +} + +provider "google" { + project = var.gcp_project_id + region = var.gcp_region +} diff --git a/examples/clickpipe/bigquery_snapshot/variables.tfvars.sample b/examples/clickpipe/bigquery_snapshot/variables.tfvars.sample new file mode 100644 index 00000000..4e932cad --- /dev/null +++ b/examples/clickpipe/bigquery_snapshot/variables.tfvars.sample @@ -0,0 +1,10 @@ +# these keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server +organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71" +token_key = "avhj1U5QCdWAE9CA9" +token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca" +service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71" + +gcp_project_id = "your-project" +gcp_region = "us-central1" +bigquery_dataset_id = "test_dataset" +bigquery_table_names = ["test_table_1", "test_table_2"] diff --git a/examples/clickpipe/bigquery_snapshot/vars.tf b/examples/clickpipe/bigquery_snapshot/vars.tf new file mode 100644 index 00000000..52e17d68 --- /dev/null +++ b/examples/clickpipe/bigquery_snapshot/vars.tf @@ -0,0 +1,30 @@ +variable "organization_id" { + description = "ClickHouse Cloud organization ID" +} +variable "token_key" { + description = "ClickHouse Cloud API token key" +} +variable "token_secret" { + description = "ClickHouse Cloud API token secret" +} + +variable "service_id" { + description = "ClickHouse ClickPipe service ID" +} + +variable "gcp_project_id" { + description = "GCP project ID where the BigQuery dataset is located" +} + +variable "gcp_region" { + description = "GCP region for the BigQuery dataset" +} + +variable "bigquery_dataset_id" { + description = "Source BigQuery dataset ID" +} + +variable "bigquery_table_names" { + description = "Source BigQuery table names" + type = list(string) +} diff --git a/examples/clickpipe/externally_managed_table/variables.sample.tfvars b/examples/clickpipe/externally_managed_table/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/externally_managed_table/variables.sample.tfvars rename to examples/clickpipe/externally_managed_table/variables.tfvars.sample diff --git a/examples/clickpipe/kafka_azure_eventhub/variables.sample.tfvars b/examples/clickpipe/kafka_azure_eventhub/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/kafka_azure_eventhub/variables.sample.tfvars rename to examples/clickpipe/kafka_azure_eventhub/variables.tfvars.sample diff --git a/examples/clickpipe/kafka_confluent/variables.sample.tfvars b/examples/clickpipe/kafka_confluent/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/kafka_confluent/variables.sample.tfvars rename to examples/clickpipe/kafka_confluent/variables.tfvars.sample diff --git a/examples/clickpipe/kafka_msk_iam_role/variables.sample.tfvars b/examples/clickpipe/kafka_msk_iam_role/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/kafka_msk_iam_role/variables.sample.tfvars rename to examples/clickpipe/kafka_msk_iam_role/variables.tfvars.sample diff --git a/examples/clickpipe/kafka_msk_iam_user/variables.sample.tfvars b/examples/clickpipe/kafka_msk_iam_user/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/kafka_msk_iam_user/variables.sample.tfvars rename to examples/clickpipe/kafka_msk_iam_user/variables.tfvars.sample diff --git a/examples/clickpipe/kafka_offset_strategy/variables.sample.tfvars b/examples/clickpipe/kafka_offset_strategy/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/kafka_offset_strategy/variables.sample.tfvars rename to examples/clickpipe/kafka_offset_strategy/variables.tfvars.sample diff --git a/examples/clickpipe/kafka_redpanda_scram/variables.sample.tfvars b/examples/clickpipe/kafka_redpanda_scram/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/kafka_redpanda_scram/variables.sample.tfvars rename to examples/clickpipe/kafka_redpanda_scram/variables.tfvars.sample diff --git a/examples/clickpipe/kafka_schema_registry/variables.sample.tfvars b/examples/clickpipe/kafka_schema_registry/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/kafka_schema_registry/variables.sample.tfvars rename to examples/clickpipe/kafka_schema_registry/variables.tfvars.sample diff --git a/examples/clickpipe/multiple_pipes_example/variables.sample.tfvars b/examples/clickpipe/multiple_pipes_example/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/multiple_pipes_example/variables.sample.tfvars rename to examples/clickpipe/multiple_pipes_example/variables.tfvars.sample diff --git a/examples/clickpipe/object_storage_azure_blob/variables.sample.tfvars b/examples/clickpipe/object_storage_azure_blob/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/object_storage_azure_blob/variables.sample.tfvars rename to examples/clickpipe/object_storage_azure_blob/variables.tfvars.sample diff --git a/examples/clickpipe/object_storage_iam_role/variables.sample.tfvars b/examples/clickpipe/object_storage_iam_role/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/object_storage_iam_role/variables.sample.tfvars rename to examples/clickpipe/object_storage_iam_role/variables.tfvars.sample diff --git a/examples/clickpipe/object_storage_iam_user/variables.sample.tfvars b/examples/clickpipe/object_storage_iam_user/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/object_storage_iam_user/variables.sample.tfvars rename to examples/clickpipe/object_storage_iam_user/variables.tfvars.sample diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_role/variables.sample.tfvars b/examples/clickpipe/object_storage_s3_sqs_iam_role/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/object_storage_s3_sqs_iam_role/variables.sample.tfvars rename to examples/clickpipe/object_storage_s3_sqs_iam_role/variables.tfvars.sample diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_user/variables.sample.tfvars b/examples/clickpipe/object_storage_s3_sqs_iam_user/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/object_storage_s3_sqs_iam_user/variables.sample.tfvars rename to examples/clickpipe/object_storage_s3_sqs_iam_user/variables.tfvars.sample diff --git a/examples/clickpipe/reverse_private_endpoint_msk/variables.sample.tfvars b/examples/clickpipe/reverse_private_endpoint_msk/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/reverse_private_endpoint_msk/variables.sample.tfvars rename to examples/clickpipe/reverse_private_endpoint_msk/variables.tfvars.sample diff --git a/examples/clickpipe/reverse_private_endpoint_msk_pipe/variables.sample.tfvars b/examples/clickpipe/reverse_private_endpoint_msk_pipe/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/reverse_private_endpoint_msk_pipe/variables.sample.tfvars rename to examples/clickpipe/reverse_private_endpoint_msk_pipe/variables.tfvars.sample diff --git a/examples/clickpipe/reverse_private_endpoint_vpc_resource/variables.sample.tfvars b/examples/clickpipe/reverse_private_endpoint_vpc_resource/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/reverse_private_endpoint_vpc_resource/variables.sample.tfvars rename to examples/clickpipe/reverse_private_endpoint_vpc_resource/variables.tfvars.sample diff --git a/examples/clickpipe/reverse_private_endpoint_vpce_service/variables.sample.tfvars b/examples/clickpipe/reverse_private_endpoint_vpce_service/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/reverse_private_endpoint_vpce_service/variables.sample.tfvars rename to examples/clickpipe/reverse_private_endpoint_vpce_service/variables.tfvars.sample diff --git a/examples/clickpipe/service_and_clickpipe/variables.sample.tfvars b/examples/clickpipe/service_and_clickpipe/variables.tfvars.sample similarity index 100% rename from examples/clickpipe/service_and_clickpipe/variables.sample.tfvars rename to examples/clickpipe/service_and_clickpipe/variables.tfvars.sample diff --git a/examples/resources/clickhouse_clickpipe/resource_bigquery_snapshot.tf b/examples/resources/clickhouse_clickpipe/resource_bigquery_snapshot.tf new file mode 100644 index 00000000..072941ab --- /dev/null +++ b/examples/resources/clickhouse_clickpipe/resource_bigquery_snapshot.tf @@ -0,0 +1,30 @@ +resource "clickhouse_clickpipe" "bigquery_snapshot_clickpipe" { + name = "BigQuery Snapshot ClickPipe" + + service_id = "dc189652-b621-4bee-9088-b5b4c3f88626" + + source = { + bigquery = { + snapshot_staging_path = "gs://my-staging-bucket/" + + credentials = { + # Base64-encoded service account JSON key + service_account_file = "ewogICJuYW1lIjogInByb2plY3RzL1BST0pFQ1RfSUQvc2VydmljZUFjY291bnRzL1NFUlZJQ0VfQUNDT1VOVF9FTUFJTC9rZXlzL0tFWV9JRCIsCiAgInByaXZhdGVLZXlUeXBlIjogIlRZUEVfR09PR0xFX0NSRURFTlRJQUxTX0ZJTEUiLAogICJwcml2YXRlS2V5RGF0YSI6ICJFTkNPREVEX1BSSVZBVEVfS0VZIiwKICAidmFsaWRBZnRlclRpbWUiOiAiREFURSIsCiAgInZhbGlkQmVmb3JlVGltZSI6ICJEQVRFIiwKICAia2V5QWxnb3JpdGhtIjogIktFWV9BTEdfUlNBXzIwNDgiCn0=" + } + + settings = { + replication_mode = "snapshot" + } + + table_mappings = [{ + source_dataset_name = "test_dataset" + source_table = "test_table" + target_table = "test_table_snapshot" + }] + } + } + + destination = { + database = "default" + } +} diff --git a/pkg/internal/api/clickpipe.go b/pkg/internal/api/clickpipe.go index 5043c984..3eaa508a 100644 --- a/pkg/internal/api/clickpipe.go +++ b/pkg/internal/api/clickpipe.go @@ -149,28 +149,37 @@ var ClickPipeObjectStorageCompressions = []string{ const ( // Postgres replication modes - ClickPipePostgresReplicationModeCDC = "cdc" - ClickPipePostgresReplicationModeSnapshot = "snapshot" - ClickPipePostgresReplicationModeCDCOnly = "cdc_only" + ClickPipeReplicationModeCDC = "cdc" + ClickPipeReplicationModeSnapshot = "snapshot" + ClickPipeReplicationModeCDCOnly = "cdc_only" +) + +const ( + ClickPipeTableEngineMergeTree = "MergeTree" + ClickPipeTableEngineReplacingMergeTree = "ReplacingMergeTree" + ClickPipeTableEngineNull = "Null" ) var ClickPipePostgresReplicationModes = []string{ - ClickPipePostgresReplicationModeCDC, - ClickPipePostgresReplicationModeSnapshot, - ClickPipePostgresReplicationModeCDCOnly, + ClickPipeReplicationModeCDC, + ClickPipeReplicationModeSnapshot, + ClickPipeReplicationModeCDCOnly, } -const ( - // Postgres table engines - ClickPipePostgresTableEngineMergeTree = "MergeTree" - ClickPipePostgresTableEngineReplacingMergeTree = "ReplacingMergeTree" - ClickPipePostgresTableEngineNull = "Null" -) +var ClickPipeBigQueryReplicationModes = []string{ + ClickPipeReplicationModeSnapshot, +} var ClickPipePostgresTableEngines = []string{ - ClickPipePostgresTableEngineMergeTree, - ClickPipePostgresTableEngineReplacingMergeTree, - ClickPipePostgresTableEngineNull, + ClickPipeTableEngineMergeTree, + ClickPipeTableEngineReplacingMergeTree, + ClickPipeTableEngineNull, +} + +var ClickPipeBigQueryTableEngines = []string{ + ClickPipeTableEngineMergeTree, + ClickPipeTableEngineReplacingMergeTree, + ClickPipeTableEngineNull, } const ( diff --git a/pkg/internal/api/clickpipe_models.go b/pkg/internal/api/clickpipe_models.go index 45afd201..b415475e 100644 --- a/pkg/internal/api/clickpipe_models.go +++ b/pkg/internal/api/clickpipe_models.go @@ -177,11 +177,43 @@ type ClickPipePostgresTableMapping struct { TableEngine *string `json:"tableEngine,omitempty"` } +type ClickPipeServiceAccount struct { + ServiceAccountFile string `json:"serviceAccountFile,omitempty"` +} + +type ClickPipeBigQuerySettings struct { + ReplicationMode string `json:"replicationMode,omitempty"` + AllowNullableColumns *bool `json:"allowNullableColumns,omitempty"` + InitialLoadParallelism *int `json:"initialLoadParallelism,omitempty"` + SnapshotNumRowsPerPartition *int `json:"snapshotNumRowsPerPartition,omitempty"` + SnapshotNumberOfParallelTables *int `json:"snapshotNumberOfParallelTables,omitempty"` +} + +type ClickPipeBigQueryTableMapping struct { + SourceDatasetName string `json:"sourceDatasetName"` + SourceTable string `json:"sourceTable"` + TargetTable string `json:"targetTable"` + ExcludedColumns []string `json:"excludedColumns,omitempty"` + UseCustomSortingKey *bool `json:"useCustomSortingKey,omitempty"` + SortingKeys []string `json:"sortingKeys,omitempty"` + TableEngine *string `json:"tableEngine,omitempty"` +} + +type ClickPipeBigQuerySource struct { + SnapshotStagingPath string `json:"snapshotStagingPath,omitempty"` + Settings ClickPipeBigQuerySettings `json:"settings"` + Mappings []ClickPipeBigQueryTableMapping `json:"tableMappings"` + TableMappingsToRemove []ClickPipeBigQueryTableMapping `json:"tableMappingsToRemove,omitempty"` + TableMappingsToAdd []ClickPipeBigQueryTableMapping `json:"tableMappingsToAdd,omitempty"` + Credentials *ClickPipeServiceAccount `json:"credentials,omitempty"` +} + type ClickPipeSource struct { Kafka *ClickPipeKafkaSource `json:"kafka,omitempty"` ObjectStorage *ClickPipeObjectStorageSource `json:"objectStorage,omitempty"` Kinesis *ClickPipeKinesisSource `json:"kinesis,omitempty"` Postgres *ClickPipePostgresSource `json:"postgres,omitempty"` + BigQuery *ClickPipeBigQuerySource `json:"bigquery,omitempty"` ValidateSamples bool `json:"validateSamples,omitempty"` } diff --git a/pkg/resource/clickpipe.go b/pkg/resource/clickpipe.go index 5dd5b0fd..65847564 100644 --- a/pkg/resource/clickpipe.go +++ b/pkg/resource/clickpipe.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/terraform-plugin-framework/resource/schema/boolplanmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/int64default" "github.com/hashicorp/terraform-plugin-framework/resource/schema/int64planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/listdefault" "github.com/hashicorp/terraform-plugin-framework/resource/schema/listplanmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/objectplanmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" @@ -49,6 +50,7 @@ const ( SourceTypeObjectStorage SourceType = "object_storage" SourceTypeKinesis SourceType = "kinesis" SourceTypePostgres SourceType = "postgres" + SourceTypeBigQuery SourceType = "bigquery" SourceTypeUnknown SourceType = "unknown" ) @@ -772,6 +774,146 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, }, }, }, + "bigquery": schema.SingleNestedAttribute{ + MarkdownDescription: "The BigQuery source configuration for the ClickPipe.", + Optional: true, + Attributes: map[string]schema.Attribute{ + "snapshot_staging_path": schema.StringAttribute{ + Description: "GCS bucket path for staging snapshot data (e.g., gs://my-bucket/staging/). Data will be automatically cleaned up after initial load.", + Required: true, + }, + "credentials": schema.SingleNestedAttribute{ + MarkdownDescription: "The credentials for BigQuery access.", + Required: true, + Attributes: map[string]schema.Attribute{ + "service_account_file": schema.StringAttribute{ + Description: "Google Cloud service account JSON key file content, base64 encoded.", + Required: true, + Sensitive: true, + }, + }, + }, + "settings": schema.SingleNestedAttribute{ + MarkdownDescription: "Settings for the BigQuery pipe.", + Required: true, + Attributes: map[string]schema.Attribute{ + "replication_mode": schema.StringAttribute{ + Required: true, + MarkdownDescription: fmt.Sprintf( + "Replication mode for the BigQuery pipe. (%s)", + wrapStringsWithBackticksAndJoinCommaSeparated(api.ClickPipeBigQueryReplicationModes), + ), + Validators: []validator.String{ + stringvalidator.OneOf(api.ClickPipeBigQueryReplicationModes...), + }, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + }, + "allow_nullable_columns": schema.BoolAttribute{ + Description: "Allow nullable columns in the destination table.", + Optional: true, + Computed: true, + Default: booldefault.StaticBool(false), + PlanModifiers: []planmodifier.Bool{ + boolplanmodifier.RequiresReplace(), + }, + }, + "initial_load_parallelism": schema.Int64Attribute{ + Description: "Number of parallel workers during initial load.", + Optional: true, + Computed: true, + Default: int64default.StaticInt64(4), + Validators: []validator.Int64{ + int64validator.AtLeast(1), + }, + PlanModifiers: []planmodifier.Int64{ + int64planmodifier.RequiresReplace(), + }, + }, + "snapshot_num_rows_per_partition": schema.Int64Attribute{ + Description: "Number of rows to snapshot per partition.", + Computed: true, + Optional: true, + Default: int64default.StaticInt64(100_000), + Validators: []validator.Int64{ + int64validator.AtLeast(1), + }, + PlanModifiers: []planmodifier.Int64{ + int64planmodifier.RequiresReplace(), + }, + }, + "snapshot_number_of_parallel_tables": schema.Int64Attribute{ + Description: "Number of parallel tables to snapshot.", + Computed: true, + Optional: true, + Default: int64default.StaticInt64(1), + Validators: []validator.Int64{ + int64validator.AtLeast(1), + }, + PlanModifiers: []planmodifier.Int64{ + int64planmodifier.RequiresReplace(), + }, + }, + }, + }, + "table_mappings": schema.ListNestedAttribute{ + Description: "Table mappings from BigQuery source to ClickHouse destination.", + Required: true, + PlanModifiers: []planmodifier.List{ + listplanmodifier.RequiresReplace(), + }, + NestedObject: schema.NestedAttributeObject{ + Attributes: map[string]schema.Attribute{ + "source_dataset_name": schema.StringAttribute{ + Description: "Source BigQuery dataset name.", + Required: true, + }, + "source_table": schema.StringAttribute{ + Description: "Source table name in BigQuery.", + Required: true, + }, + "target_table": schema.StringAttribute{ + Description: "Target table name in ClickHouse.", + Required: true, + }, + "excluded_columns": schema.ListAttribute{ + Description: "Columns to exclude from replication.", + Optional: true, + Computed: true, + Default: listdefault.StaticValue(types.ListNull(types.StringType)), + ElementType: types.StringType, + }, + "use_custom_sorting_key": schema.BoolAttribute{ + Description: "Whether to use a custom sorting key for the target table.", + Default: booldefault.StaticBool(false), + Computed: true, + Optional: true, + }, + "sorting_keys": schema.ListAttribute{ + Description: "Ordered list of columns to use as sorting key for the target table. Required when use_custom_sorting_key is true.", + ElementType: types.StringType, + Default: listdefault.StaticValue(types.ListNull(types.StringType)), + Computed: true, + Optional: true, + }, + "table_engine": schema.StringAttribute{ + MarkdownDescription: fmt.Sprintf( + "Table engine to use for the target table. (%s)", + wrapStringsWithBackticksAndJoinCommaSeparated(api.ClickPipeBigQueryTableEngines), + ), + Default: stringdefault.StaticString(api.ClickPipeTableEngineReplacingMergeTree), + Optional: true, + Computed: true, + Validators: []validator.String{ + stringvalidator.OneOf(api.ClickPipeBigQueryTableEngines...), + }, + }, + }, + }, + }, + }, + }, }, Required: true, }, @@ -1077,7 +1219,10 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod // Override the default value to prevent inconsistency errors var sourceModel models.ClickPipeSourceModel if diags := plan.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{}); !diags.HasError() { - if !sourceModel.Postgres.IsNull() { + sourceType := getSourceType(sourceModel) + isDBPipe := sourceType == SourceTypePostgres || sourceType == SourceTypeBigQuery + + if isDBPipe { var destinationModel models.ClickPipeDestinationModel if diags := plan.Destination.As(ctx, &destinationModel, basetypes.ObjectAsOptions{}); !diags.HasError() { destinationModel.ManagedTable = types.BoolValue(false) @@ -1124,6 +1269,8 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR response.Diagnostics.Append(plan.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{})...) sourceType := getSourceType(sourceModel) isPostgresSource := sourceType == SourceTypePostgres + isBigQuerySource := sourceType == SourceTypeBigQuery + isDBPipe := isPostgresSource || isBigQuerySource // Extract roles from the destination model var rolesSlice []string @@ -1137,8 +1284,7 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR Roles: rolesSlice, } - // For non-Postgres sources, set table, managedTable, and columns - if !isPostgresSource { + if !isDBPipe { // Validate that required fields are provided if destinationModel.Table.IsNull() || destinationModel.Table.ValueString() == "" { response.Diagnostics.AddError( @@ -1213,8 +1359,8 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR } } - // Field mappings are only for non-Postgres sources - if !isPostgresSource { + // Field mappings are only for non-DB sources + if !isDBPipe { fieldMappingsModels := make([]models.ClickPipeFieldMappingModel, len(plan.FieldMappings.Elements())) response.Diagnostics.Append(plan.FieldMappings.ElementsAs(ctx, &fieldMappingsModels, false)...) clickPipe.FieldMappings = make([]api.ClickPipeFieldMapping, len(fieldMappingsModels)) @@ -1225,7 +1371,7 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR } } } - // For Postgres, leave field_mappings as nil (will be omitted from JSON) + // For DB pipes, leave field_mappings as nil (will be omitted from JSON) // Handle settings if !plan.Settings.IsNull() && !plan.Settings.IsUnknown() { @@ -1343,6 +1489,8 @@ func getSourceType(sourceModel models.ClickPipeSourceModel) SourceType { return SourceTypeKinesis } else if !sourceModel.Postgres.IsNull() { return SourceTypePostgres + } else if !sourceModel.BigQuery.IsNull() { + return SourceTypeBigQuery } return SourceTypeUnknown } @@ -1560,6 +1708,92 @@ func (c *ClickPipeResource) extractSourceFromPlan(ctx context.Context, diagnosti return nil } } + } else if !sourceModel.BigQuery.IsNull() { + // BigQuery does not support updates + if isUpdate { + diagnostics.AddError( + "Error Updating ClickPipe", + "BigQuery ClickPipe sources cannot be updated. Please recreate the resource to change configuration.", + ) + return nil + } + + bigQueryModel := models.ClickPipeBigQuerySourceModel{} + diagnostics.Append(sourceModel.BigQuery.As(ctx, &bigQueryModel, basetypes.ObjectAsOptions{})...) + + // Extract credentials + credentialsModel := models.ClickPipeServiceAccountModel{} + diagnostics.Append(bigQueryModel.Credentials.As(ctx, &credentialsModel, basetypes.ObjectAsOptions{})...) + + // Extract settings + settingsModel := models.ClickPipeBigQuerySettingsModel{} + diagnostics.Append(bigQueryModel.Settings.As(ctx, &settingsModel, basetypes.ObjectAsOptions{})...) + + settings := api.ClickPipeBigQuerySettings{ + ReplicationMode: settingsModel.ReplicationMode.ValueString(), + } + + if !settingsModel.AllowNullableColumns.IsNull() { + val := settingsModel.AllowNullableColumns.ValueBool() + settings.AllowNullableColumns = &val + } + if !settingsModel.InitialLoadParallelism.IsNull() { + val := int(settingsModel.InitialLoadParallelism.ValueInt64()) + settings.InitialLoadParallelism = &val + } + if !settingsModel.SnapshotNumRowsPerPartition.IsNull() { + val := int(settingsModel.SnapshotNumRowsPerPartition.ValueInt64()) + settings.SnapshotNumRowsPerPartition = &val + } + if !settingsModel.SnapshotNumberOfParallelTables.IsNull() { + val := int(settingsModel.SnapshotNumberOfParallelTables.ValueInt64()) + settings.SnapshotNumberOfParallelTables = &val + } + + // Extract table mappings + tableMappingModels := make([]models.ClickPipeBigQueryTableMappingModel, len(bigQueryModel.TableMappings.Elements())) + diagnostics.Append(bigQueryModel.TableMappings.ElementsAs(ctx, &tableMappingModels, false)...) + + tableMappings := make([]api.ClickPipeBigQueryTableMapping, len(tableMappingModels)) + for i, mappingModel := range tableMappingModels { + mapping := api.ClickPipeBigQueryTableMapping{ + SourceDatasetName: mappingModel.SourceDatasetName.ValueString(), + SourceTable: mappingModel.SourceTable.ValueString(), + TargetTable: mappingModel.TargetTable.ValueString(), + } + + if !mappingModel.ExcludedColumns.IsNull() && len(mappingModel.ExcludedColumns.Elements()) > 0 { + excludedCols := make([]string, len(mappingModel.ExcludedColumns.Elements())) + diagnostics.Append(mappingModel.ExcludedColumns.ElementsAs(ctx, &excludedCols, false)...) + mapping.ExcludedColumns = excludedCols + } + + if !mappingModel.UseCustomSortingKey.IsNull() { + val := mappingModel.UseCustomSortingKey.ValueBool() + mapping.UseCustomSortingKey = &val + } + + if !mappingModel.SortingKeys.IsNull() && len(mappingModel.SortingKeys.Elements()) > 0 { + sortingKeys := make([]string, len(mappingModel.SortingKeys.Elements())) + diagnostics.Append(mappingModel.SortingKeys.ElementsAs(ctx, &sortingKeys, false)...) + mapping.SortingKeys = sortingKeys + } + + if !mappingModel.TableEngine.IsNull() { + mapping.TableEngine = mappingModel.TableEngine.ValueStringPointer() + } + + tableMappings[i] = mapping + } + + source.BigQuery = &api.ClickPipeBigQuerySource{ + SnapshotStagingPath: bigQueryModel.SnapshotStagingPath.ValueString(), + Settings: settings, + Mappings: tableMappings, + Credentials: &api.ClickPipeServiceAccount{ + ServiceAccountFile: credentialsModel.ServiceAccountFile.ValueString(), + }, + } } else if !sourceModel.Postgres.IsNull() { postgresModel := models.ClickPipePostgresSourceModel{} diagnostics.Append(sourceModel.Postgres.As(ctx, &postgresModel, basetypes.ObjectAsOptions{})...) @@ -1711,17 +1945,20 @@ func (c *ClickPipeResource) getStateCheckFunc(ctx context.Context, plan models.C } } - // Check if this is a snapshot-only Postgres pipe + // Check if this is a snapshot-only DB pipe (Postgres snapshot mode or BigQuery) isSnapshotOnly := false var sourceModel models.ClickPipeSourceModel if diags := plan.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{}); diags == nil { - if !sourceModel.Postgres.IsNull() { + // BigQuery is always snapshot-only + if !sourceModel.BigQuery.IsNull() { + isSnapshotOnly = true + } else if !sourceModel.Postgres.IsNull() { var postgresSource models.ClickPipePostgresSourceModel if diags := sourceModel.Postgres.As(ctx, &postgresSource, basetypes.ObjectAsOptions{}); diags == nil { if !postgresSource.Settings.IsNull() { var settings models.ClickPipePostgresSettingsModel if diags := postgresSource.Settings.As(ctx, &settings, basetypes.ObjectAsOptions{}); diags == nil { - isSnapshotOnly = settings.ReplicationMode.ValueString() == api.ClickPipePostgresReplicationModeSnapshot + isSnapshotOnly = settings.ReplicationMode.ValueString() == api.ClickPipeReplicationModeSnapshot } } } @@ -2132,10 +2369,139 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model sourceModel.Postgres = types.ObjectNull(models.ClickPipePostgresSourceModel{}.ObjectType().AttrTypes) } + if clickPipe.Source.BigQuery != nil { + stateBigQueryModel := models.ClickPipeBigQuerySourceModel{} + var stateSettingsModel models.ClickPipeBigQuerySettingsModel + + if !stateSourceModel.BigQuery.IsNull() { + if diags := stateSourceModel.BigQuery.As(ctx, &stateBigQueryModel, basetypes.ObjectAsOptions{}); diags.HasError() { + return fmt.Errorf("error reading ClickPipe BigQuery source: %v", diags) + } + // Get the state settings model + if !stateBigQueryModel.Settings.IsNull() { + if diags := stateBigQueryModel.Settings.As(ctx, &stateSettingsModel, basetypes.ObjectAsOptions{}); diags.HasError() { + return fmt.Errorf("error reading ClickPipe BigQuery settings: %v", diags) + } + } + } + + // Settings - preserve null values from state for optional fields + settingsModel := models.ClickPipeBigQuerySettingsModel{ + ReplicationMode: types.StringValue(clickPipe.Source.BigQuery.Settings.ReplicationMode), + } + + if clickPipe.Source.BigQuery.Settings.AllowNullableColumns != nil { + settingsModel.AllowNullableColumns = types.BoolValue(*clickPipe.Source.BigQuery.Settings.AllowNullableColumns) + } else { + settingsModel.AllowNullableColumns = types.BoolNull() + } + + if clickPipe.Source.BigQuery.Settings.InitialLoadParallelism != nil { + settingsModel.InitialLoadParallelism = types.Int64Value(int64(*clickPipe.Source.BigQuery.Settings.InitialLoadParallelism)) + } else { + settingsModel.InitialLoadParallelism = types.Int64Null() + } + + if clickPipe.Source.BigQuery.Settings.SnapshotNumRowsPerPartition != nil { + settingsModel.SnapshotNumRowsPerPartition = types.Int64Value(int64(*clickPipe.Source.BigQuery.Settings.SnapshotNumRowsPerPartition)) + } else { + settingsModel.SnapshotNumRowsPerPartition = types.Int64Null() + } + + if clickPipe.Source.BigQuery.Settings.SnapshotNumberOfParallelTables != nil { + settingsModel.SnapshotNumberOfParallelTables = types.Int64Value(int64(*clickPipe.Source.BigQuery.Settings.SnapshotNumberOfParallelTables)) + } else { + settingsModel.SnapshotNumberOfParallelTables = types.Int64Null() + } + + // Table mappings - preserve null values from state + var stateTableMappings []models.ClickPipeBigQueryTableMappingModel + if !stateBigQueryModel.TableMappings.IsNull() && len(stateBigQueryModel.TableMappings.Elements()) > 0 { + stateTableMappings = make([]models.ClickPipeBigQueryTableMappingModel, len(stateBigQueryModel.TableMappings.Elements())) + stateBigQueryModel.TableMappings.ElementsAs(ctx, &stateTableMappings, false) + } + + tableMappingList := make([]attr.Value, len(clickPipe.Source.BigQuery.Mappings)) + for i, mapping := range clickPipe.Source.BigQuery.Mappings { + tableMappingModel := models.ClickPipeBigQueryTableMappingModel{ + SourceDatasetName: types.StringValue(mapping.SourceDatasetName), + SourceTable: types.StringValue(mapping.SourceTable), + TargetTable: types.StringValue(mapping.TargetTable), + } + + // Get corresponding state mapping if it exists + var stateMapping *models.ClickPipeBigQueryTableMappingModel + if i < len(stateTableMappings) { + stateMapping = &stateTableMappings[i] + } + + if len(mapping.ExcludedColumns) > 0 { + excludedColsList := make([]attr.Value, len(mapping.ExcludedColumns)) + for j, col := range mapping.ExcludedColumns { + excludedColsList[j] = types.StringValue(col) + } + tableMappingModel.ExcludedColumns, _ = types.ListValue(types.StringType, excludedColsList) + } else { + tableMappingModel.ExcludedColumns = types.ListNull(types.StringType) + } + + if mapping.UseCustomSortingKey != nil { + tableMappingModel.UseCustomSortingKey = types.BoolValue(*mapping.UseCustomSortingKey) + } else { + tableMappingModel.UseCustomSortingKey = types.BoolNull() + } + + if len(mapping.SortingKeys) > 0 { + sortingKeysList := make([]attr.Value, len(mapping.SortingKeys)) + for j, key := range mapping.SortingKeys { + sortingKeysList[j] = types.StringValue(key) + } + tableMappingModel.SortingKeys, _ = types.ListValue(types.StringType, sortingKeysList) + } else { + tableMappingModel.SortingKeys = types.ListNull(types.StringType) + } + + // For table_engine, preserve null from state if it was null (API may return default) + if stateMapping != nil && stateMapping.TableEngine.IsNull() { + tableMappingModel.TableEngine = types.StringNull() + } else if mapping.TableEngine != nil && *mapping.TableEngine != "" { + tableMappingModel.TableEngine = types.StringValue(*mapping.TableEngine) + } else if stateMapping != nil { + tableMappingModel.TableEngine = stateMapping.TableEngine + } else { + tableMappingModel.TableEngine = types.StringNull() + } + + tableMappingList[i] = tableMappingModel.ObjectValue() + } + + bigQueryModel := models.ClickPipeBigQuerySourceModel{ + SnapshotStagingPath: types.StringValue(clickPipe.Source.BigQuery.SnapshotStagingPath), + Settings: settingsModel.ObjectValue(), + TableMappings: types.ListNull(models.ClickPipeBigQueryTableMappingModel{}.ObjectType()), + } + + if len(tableMappingList) > 0 { + bigQueryModel.TableMappings, _ = types.ListValue(models.ClickPipeBigQueryTableMappingModel{}.ObjectType(), tableMappingList) + } + + // Preserve credentials from state as API doesn't return them + if !stateBigQueryModel.Credentials.IsNull() { + bigQueryModel.Credentials = stateBigQueryModel.Credentials + } else { + bigQueryModel.Credentials = types.ObjectNull(models.ClickPipeServiceAccountModel{}.ObjectType().AttrTypes) + } + + sourceModel.BigQuery = bigQueryModel.ObjectValue() + } else { + sourceModel.BigQuery = types.ObjectNull(models.ClickPipeBigQuerySourceModel{}.ObjectType().AttrTypes) + } + state.Source = sourceModel.ObjectValue() - // Check if this is a Postgres CDC pipe isPostgresPipe := clickPipe.Source.Postgres != nil + isBigQueryPipe := clickPipe.Source.BigQuery != nil + isDBPipe := isPostgresPipe || isBigQueryPipe stateDestinationModel := models.ClickPipeDestinationModel{} if !state.Destination.IsNull() { @@ -2148,15 +2514,15 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model Database: types.StringValue(clickPipe.Destination.Database), } - // For Postgres CDC, table/columns/tableDefinition are always null (managed via table_mappings) + // For DB pipes, table/columns/tableDefinition are always null (managed via table_mappings) // But managed_table should be preserved from state since user can configure it - if isPostgresPipe { + if isDBPipe { destinationModel.Table = types.StringNull() - destinationModel.ManagedTable = types.BoolValue(false) // Always false for Postgres pipes + destinationModel.ManagedTable = types.BoolValue(false) // Always false for DB pipes destinationModel.Columns = types.ListNull(models.ClickPipeDestinationColumnModel{}.ObjectType()) destinationModel.TableDefinition = types.ObjectNull(models.ClickPipeDestinationTableDefinitionModel{}.ObjectType().AttrTypes) } else { - // For non-Postgres sources, use API response + // For non-DB sources, use API response if clickPipe.Destination.Table != nil { destinationModel.Table = types.StringValue(*clickPipe.Destination.Table) } else { @@ -2180,8 +2546,8 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model destinationModel.Roles = types.ListNull(types.StringType) } - // Only process columns and table definition for non-Postgres sources - if !isPostgresPipe { + // Only process columns and table definition for non-DB sources + if !isDBPipe { columnList := make([]attr.Value, len(clickPipe.Destination.Columns)) for i, column := range clickPipe.Destination.Columns { columnList[i] = models.ClickPipeDestinationColumnModel{ @@ -2193,7 +2559,7 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model destinationModel.Columns, _ = types.ListValue(models.ClickPipeDestinationColumnModel{}.ObjectType(), columnList) } - if !isPostgresPipe && clickPipe.Destination.TableDefinition != nil { + if !isDBPipe && clickPipe.Destination.TableDefinition != nil { engineModel := models.ClickPipeDestinationTableEngineModel{ Type: types.StringValue(normalizeEngineType(clickPipe.Destination.TableDefinition.Engine.Type)), VersionColumnID: types.StringNull(), diff --git a/pkg/resource/models/clickpipe_resource.go b/pkg/resource/models/clickpipe_resource.go index ce536572..6c76baf5 100644 --- a/pkg/resource/models/clickpipe_resource.go +++ b/pkg/resource/models/clickpipe_resource.go @@ -420,11 +420,123 @@ func (m ClickPipeObjectStorageSourceModel) ObjectValue() types.Object { }) } +type ClickPipeServiceAccountModel struct { + ServiceAccountFile types.String `tfsdk:"service_account_file"` +} + +func (m ClickPipeServiceAccountModel) ObjectType() types.ObjectType { + return types.ObjectType{ + AttrTypes: map[string]attr.Type{ + "service_account_file": types.StringType, + }, + } +} + +func (m ClickPipeServiceAccountModel) ObjectValue() types.Object { + return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{ + "service_account_file": m.ServiceAccountFile, + }) +} + +type ClickPipeBigQuerySettingsModel struct { + ReplicationMode types.String `tfsdk:"replication_mode"` + AllowNullableColumns types.Bool `tfsdk:"allow_nullable_columns"` + InitialLoadParallelism types.Int64 `tfsdk:"initial_load_parallelism"` + SnapshotNumRowsPerPartition types.Int64 `tfsdk:"snapshot_num_rows_per_partition"` + SnapshotNumberOfParallelTables types.Int64 `tfsdk:"snapshot_number_of_parallel_tables"` +} + +func (m ClickPipeBigQuerySettingsModel) ObjectType() types.ObjectType { + return types.ObjectType{ + AttrTypes: map[string]attr.Type{ + "replication_mode": types.StringType, + "allow_nullable_columns": types.BoolType, + "initial_load_parallelism": types.Int64Type, + "snapshot_num_rows_per_partition": types.Int64Type, + "snapshot_number_of_parallel_tables": types.Int64Type, + }, + } +} + +func (m ClickPipeBigQuerySettingsModel) ObjectValue() types.Object { + return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{ + "replication_mode": m.ReplicationMode, + "allow_nullable_columns": m.AllowNullableColumns, + "initial_load_parallelism": m.InitialLoadParallelism, + "snapshot_num_rows_per_partition": m.SnapshotNumRowsPerPartition, + "snapshot_number_of_parallel_tables": m.SnapshotNumberOfParallelTables, + }) +} + +type ClickPipeBigQueryTableMappingModel struct { + SourceDatasetName types.String `tfsdk:"source_dataset_name"` + SourceTable types.String `tfsdk:"source_table"` + TargetTable types.String `tfsdk:"target_table"` + ExcludedColumns types.List `tfsdk:"excluded_columns"` + UseCustomSortingKey types.Bool `tfsdk:"use_custom_sorting_key"` + SortingKeys types.List `tfsdk:"sorting_keys"` + TableEngine types.String `tfsdk:"table_engine"` +} + +func (m ClickPipeBigQueryTableMappingModel) ObjectType() types.ObjectType { + return types.ObjectType{ + AttrTypes: map[string]attr.Type{ + "source_dataset_name": types.StringType, + "source_table": types.StringType, + "target_table": types.StringType, + "excluded_columns": types.ListType{ElemType: types.StringType}, + "use_custom_sorting_key": types.BoolType, + "sorting_keys": types.ListType{ElemType: types.StringType}, + "table_engine": types.StringType, + }, + } +} + +func (m ClickPipeBigQueryTableMappingModel) ObjectValue() types.Object { + return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{ + "source_dataset_name": m.SourceDatasetName, + "source_table": m.SourceTable, + "target_table": m.TargetTable, + "excluded_columns": m.ExcludedColumns, + "use_custom_sorting_key": m.UseCustomSortingKey, + "sorting_keys": m.SortingKeys, + "table_engine": m.TableEngine, + }) +} + +type ClickPipeBigQuerySourceModel struct { + SnapshotStagingPath types.String `tfsdk:"snapshot_staging_path"` + Settings types.Object `tfsdk:"settings"` + TableMappings types.List `tfsdk:"table_mappings"` + Credentials types.Object `tfsdk:"credentials"` +} + +func (m ClickPipeBigQuerySourceModel) ObjectType() types.ObjectType { + return types.ObjectType{ + AttrTypes: map[string]attr.Type{ + "snapshot_staging_path": types.StringType, + "settings": ClickPipeBigQuerySettingsModel{}.ObjectType(), + "table_mappings": types.ListType{ElemType: ClickPipeBigQueryTableMappingModel{}.ObjectType()}, + "credentials": ClickPipeServiceAccountModel{}.ObjectType(), + }, + } +} + +func (m ClickPipeBigQuerySourceModel) ObjectValue() types.Object { + return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{ + "snapshot_staging_path": m.SnapshotStagingPath, + "settings": m.Settings, + "table_mappings": m.TableMappings, + "credentials": m.Credentials, + }) +} + type ClickPipeSourceModel struct { Kafka types.Object `tfsdk:"kafka"` ObjectStorage types.Object `tfsdk:"object_storage"` Kinesis types.Object `tfsdk:"kinesis"` Postgres types.Object `tfsdk:"postgres"` + BigQuery types.Object `tfsdk:"bigquery"` } func (m ClickPipeSourceModel) ObjectType() types.ObjectType { @@ -434,6 +546,7 @@ func (m ClickPipeSourceModel) ObjectType() types.ObjectType { "object_storage": ClickPipeObjectStorageSourceModel{}.ObjectType(), "kinesis": ClickPipeKinesisSourceModel{}.ObjectType(), "postgres": ClickPipePostgresSourceModel{}.ObjectType(), + "bigquery": ClickPipeBigQuerySourceModel{}.ObjectType(), }, } } @@ -444,6 +557,7 @@ func (m ClickPipeSourceModel) ObjectValue() types.Object { "object_storage": m.ObjectStorage, "kinesis": m.Kinesis, "postgres": m.Postgres, + "bigquery": m.BigQuery, }) }