|
15 | 15 | USERNAME = os.getenv("USERNAME", "") |
16 | 16 | USER_PWD = os.getenv("USERPWD", "") |
17 | 17 | DB_NAME = os.getenv("DB_NAME", "") |
| 18 | +KINESIS_TARGET = os.getenv("KINESIS_TARGET", "default") |
18 | 19 | SCHEMA_NAME = "public" |
19 | 20 |
|
20 | 21 |
|
@@ -227,23 +228,36 @@ def create_postgres_access_role(stack: Stack, postgres_secret: secretsmanager.Cf |
227 | 228 |
|
228 | 229 |
|
229 | 230 | def create_kinesis_target_endpoint(stack: Stack, target_stream: kinesis.Stream, dms_assume_role: iam.Role) -> dms.CfnEndpoint: |
| 231 | + if str.lower(KINESIS_TARGET) == "non-default": |
| 232 | + return dms.CfnEndpoint( |
| 233 | + stack, |
| 234 | + "target", |
| 235 | + endpoint_type="target", |
| 236 | + engine_name="kinesis", |
| 237 | + kinesis_settings=dms.CfnEndpoint.KinesisSettingsProperty( |
| 238 | + stream_arn=target_stream.stream_arn, |
| 239 | + message_format="json", |
| 240 | + service_access_role_arn=dms_assume_role.role_arn, |
| 241 | + include_control_details=True, |
| 242 | + include_null_and_empty=True, |
| 243 | + include_partition_value=True, |
| 244 | + include_table_alter_operations=True, |
| 245 | + include_transaction_details=False, |
| 246 | + partition_include_schema_table=True, |
| 247 | + ), |
| 248 | + ) |
| 249 | + |
230 | 250 | return dms.CfnEndpoint( |
231 | | - stack, |
232 | | - "target", |
233 | | - endpoint_type="target", |
234 | | - engine_name="kinesis", |
235 | | - kinesis_settings=dms.CfnEndpoint.KinesisSettingsProperty( |
236 | | - stream_arn=target_stream.stream_arn, |
237 | | - message_format="json", |
238 | | - service_access_role_arn=dms_assume_role.role_arn, |
239 | | - include_control_details=True, |
240 | | - include_null_and_empty=True, |
241 | | - include_partition_value=True, |
242 | | - include_table_alter_operations=True, |
243 | | - include_transaction_details=False, |
244 | | - partition_include_schema_table=True, |
245 | | - ), |
246 | | - ) |
| 251 | + stack, |
| 252 | + "target", |
| 253 | + endpoint_type="target", |
| 254 | + engine_name="kinesis", |
| 255 | + kinesis_settings=dms.CfnEndpoint.KinesisSettingsProperty( |
| 256 | + stream_arn=target_stream.stream_arn, |
| 257 | + message_format="json", |
| 258 | + service_access_role_arn=dms_assume_role.role_arn, |
| 259 | + ), |
| 260 | + ) |
247 | 261 |
|
248 | 262 |
|
249 | 263 | def create_replication_instance( |
|
0 commit comments