A powerful Python framework for unit testing SQL queries with mock data injection
Test SQL queries across BigQuery, Snowflake, Redshift, Athena, Trino, and DuckDB with type-safe mock data, pytest integration, and automatic table resolution. Perfect for data engineering, ETL pipeline testing, and analytics validation.
Quick Links: Installation | Quick Start | Documentation | Examples | PyPI Package
SQL testing in data engineering can be challenging, especially when working with large datasets and complex queries across multiple database platforms. This library was born from real-world production needs at scale, addressing the pain points of:
- Fragile Integration Tests: Traditional tests that depend on live data break when data changes
- Slow Feedback Loops: Running tests against full datasets takes too long for CI/CD
- Database Engine Upgrades: UDF semantics and SQL behavior change between database versions, causing silent production failures
- Database Lock-in: Tests written for one database don't work on another
- Complex Setup: Each database requires different mocking strategies and tooling
For more details on our journey and the engineering challenges we solved, read the full story: "Our Journey to Building a Scalable SQL Testing Library for Athena"
- ETL Pipeline Testing: Validate data transformations with controlled input data
- Data Quality Assurance: Test data validation rules and business logic in SQL
- Schema Migration Testing: Ensure queries work correctly after schema changes
- Database Engine Upgrades: Catch breaking changes in SQL UDF semantics across database versions before they hit production
- Cross-Database Compatibility: Write tests once, run on multiple database platforms
- Report Validation: Test analytical queries with known datasets to verify results
- A/B Test Analysis: Validate statistical calculations and business metrics
- Dashboard Backend Testing: Ensure dashboard queries return expected data structures
- Fast Feedback: Run comprehensive SQL tests in seconds, not minutes
- Isolated Testing: Tests don't interfere with production data or other tests
- Cost Optimization: Reduce cloud database costs by avoiding large dataset queries in tests
- Multi-Database Support: Test SQL across BigQuery, Athena, Redshift, Trino, Snowflake, and DuckDB
- Mock Data Injection: Use Python dataclasses for type-safe test data
- CTE or Physical Tables: Automatic fallback for query size limits
- Type-Safe Results: Deserialize results to Pydantic models
- Pytest Integration: Seamless testing with
@sql_testdecorator - SQL Logging: Comprehensive SQL logging with formatted output, error traces, and temp table queries
The library supports different data types across database engines. All checkmarks indicate comprehensive test coverage with verified functionality.
| Data Type | Python Type | BigQuery | Athena | Redshift | Trino | Snowflake | DuckDB |
|---|---|---|---|---|---|---|---|
| String | str |
âś… | âś… | âś… | âś… | âś… | âś… |
| Integer | int |
âś… | âś… | âś… | âś… | âś… | âś… |
| Float | float |
âś… | âś… | âś… | âś… | âś… | âś… |
| Boolean | bool |
âś… | âś… | âś… | âś… | âś… | âś… |
| Date | date |
âś… | âś… | âś… | âś… | âś… | âś… |
| Datetime | datetime |
âś… | âś… | âś… | âś… | âś… | âś… |
| Decimal | Decimal |
âś… | âś… | âś… | âś… | âś… | âś… |
| Optional | Optional[T] |
âś… | âś… | âś… | âś… | âś… | âś… |
| Data Type | Python Type | BigQuery | Athena | Redshift | Trino | Snowflake | DuckDB |
|---|---|---|---|---|---|---|---|
| String Array | List[str] |
âś… | âś… | âś… | âś… | âś… | âś… |
| Integer Array | List[int] |
âś… | âś… | âś… | âś… | âś… | âś… |
| Decimal Array | List[Decimal] |
âś… | âś… | âś… | âś… | âś… | âś… |
| Optional Array | Optional[List[T]] |
âś… | âś… | âś… | âś… | âś… | âś… |
| Map/Dict | Dict[K, V] |
âś… | âś… | âś… | âś… | âś… | âś… |
| Struct/Record | dataclass |
✅ | ✅ | ❌ | ✅ | ❌ | ✅ |
| Nested Arrays | List[List[T]] |
❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
- BigQuery: NULL arrays become empty arrays
[]; uses scientific notation for large decimals; dict/map types stored as JSON strings; struct types supported usingSTRUCTsyntax with named fields (dataclasses and Pydantic models) - Athena: 256KB query size limit; supports arrays and maps using
ARRAY[]andMAP(ARRAY[], ARRAY[])syntax; supports struct types usingROWwith named fields (dataclasses and Pydantic models) - Redshift: Arrays and maps implemented via SUPER type (JSON parsing); 16MB query size limit; struct types not yet supported
- Trino: Memory catalog for testing; excellent decimal precision; supports arrays, maps, and struct types using
ROWwith named fields (dataclasses and Pydantic models) - Snowflake: Column names normalized to lowercase; 1MB query size limit; dict/map types implemented via VARIANT type (JSON parsing); struct types not yet supported
- DuckDB: Fast embedded analytics database; excellent SQL standards compliance; supports arrays, maps, and struct types using
STRUCTsyntax with named fields (dataclasses and Pydantic models)
The library supports two execution modes for mock data injection. CTE Mode is the default and is automatically used unless Physical Tables mode is explicitly requested or required due to query size limits.
| Execution Mode | Description | BigQuery | Athena | Redshift | Trino | Snowflake | DuckDB |
|---|---|---|---|---|---|---|---|
| CTE Mode | Mock data injected as Common Table Expressions | âś… | âś… | âś… | âś… | âś… | âś… |
| Physical Tables | Mock data created as temporary tables | âś… | âś… | âś… | âś… | âś… | âś… |
- Default Behavior: Used automatically for all tests unless overridden
- Data Injection: Mock data is injected as Common Table Expressions (CTEs) within the SQL query
- No Physical Objects: No actual tables are created in the database
- Memory-Based: All data exists only for the duration of the query execution
- Compatibility: Works with all database engines
- Query Size: Subject to database-specific query size limits (see table below)
- When Used: Automatically activated when CTE queries exceed size limits, or explicitly requested with
use_physical_tables=True - Table Creation: Creates actual temporary tables in the database with mock data
- Table Types by Database:
| Database | Table Type | Schema/Location | Cleanup Method | Cleanup Timing |
|---|---|---|---|---|
| BigQuery | Standard tables | Project dataset | Library executes client.delete_table() |
After each test |
| Athena | External tables | S3-backed external tables | Library executes DROP TABLE ( |
After each test |
| Redshift | Temporary tables | Session-specific temp schema | Database automatic | Session end |
| Trino | Memory tables | memory.default schema |
Library executes DROP TABLE |
After each test |
| Snowflake | Temporary tables | Session-specific temp schema | Database automatic | Session end |
| DuckDB | Temporary tables | Database-specific temp schema | Library executes DROP TABLE |
After each test |
Library-Managed Cleanup (BigQuery, Athena, Trino, DuckDB):
- The SQL Testing Library explicitly calls cleanup methods after each test
- BigQuery: Creates standard tables in your dataset, then deletes them via
client.delete_table() - Athena: Creates external tables backed by S3 data, then drops table metadata via
DROP TABLE IF EXISTS(⚠️ S3 data files remain and require separate cleanup) - Trino: Creates tables in memory catalog, then drops them via
DROP TABLE IF EXISTS - DuckDB: Creates temporary tables in the database, then drops them via
DROP TABLE IF EXISTS
Database-Managed Cleanup (Redshift, Snowflake):
- These databases have built-in temporary table mechanisms
- Redshift: Uses
CREATE TEMPORARY TABLE- automatically dropped when session ends - Snowflake: Uses
CREATE TEMPORARY TABLE- automatically dropped when session ends - The library's cleanup method is a no-op for these databases
Why the Difference?
- Athena & Trino: Don't have true temporary table features, so library manages cleanup
- BigQuery: Has temporary tables, but library uses standard tables for better control
- Redshift & Snowflake: Have robust temporary table features that handle cleanup automatically
Q: Why does Athena require "manual" cleanup while others are automatic?
A: Athena creates external tables backed by S3 data. The library automatically calls DROP TABLE after each test, which removes the table metadata from AWS Glue catalog. However, the actual S3 data files remain and must be cleaned up separately - either manually or through S3 lifecycle policies. This two-step cleanup process is why it's considered "manual" compared to true temporary tables.
Q: What does "explicit cleanup" mean for Trino?
A: Trino's memory catalog doesn't automatically clean up tables when sessions end. The library explicitly calls DROP TABLE IF EXISTS after each test to remove the tables. Like Athena, if a test fails catastrophically, some tables might persist until the Trino server restarts.
Q: What is the TTL (Time To Live) for BigQuery tables?
A: BigQuery tables created by the library are standard tables without TTL - they persist until explicitly deleted. The library immediately calls client.delete_table() after each test. If you want to set TTL as a safety net, you can configure it at the dataset level (e.g., 24 hours) to auto-delete any orphaned tables.
Q: Which databases leave artifacts if tests crash?
- BigQuery, Athena, Trino, DuckDB: May leave tables if library crashes before cleanup
- Redshift, Snowflake: No artifacts - temporary tables auto-cleanup on session end
Q: How to manually clean up orphaned tables?
-- BigQuery: List and delete tables with temp prefix
SELECT table_name FROM `project.dataset.INFORMATION_SCHEMA.TABLES`
WHERE table_name LIKE 'temp_%';
-- Athena: List and drop tables with temp prefix
SHOW TABLES LIKE 'temp_%';
DROP TABLE temp_table_name;
-- Trino: List and drop tables with temp prefix
SHOW TABLES FROM memory.default LIKE 'temp_%';
DROP TABLE memory.default.temp_table_name;
-- DuckDB: List and drop tables with temp prefix
SHOW TABLES;
DROP TABLE temp_table_name;Q: How to handle S3 cleanup for Athena tables?
Athena external tables store data in S3. When DROP TABLE is called, only the table metadata is removed from AWS Glue catalog - S3 data files remain. Here are cleanup options:
Option 1: S3 Lifecycle Policy (Recommended)
{
"Rules": [
{
"ID": "DeleteSQLTestingTempFiles",
"Status": "Enabled",
"Filter": {
"Prefix": "temp_"
},
"Expiration": {
"Days": 1
}
}
]
}Option 2: Manual S3 Cleanup
# List temp files in your Athena results bucket
aws s3 ls s3://your-athena-results-bucket/ --recursive | grep temp_
# Delete temp files older than 1 day
aws s3 rm s3://your-athena-results-bucket/ --recursive --exclude "*" --include "temp_*"Option 3: Automated Cleanup Script
#!/bin/bash
# Delete S3 objects older than 1 day with temp_ prefix
aws s3api list-objects-v2 --bucket your-athena-results-bucket --prefix "temp_" \
--query 'Contents[?LastModified<=`2024-01-01`].Key' --output text | \
xargs -I {} aws s3 rm s3://your-athena-results-bucket/{}| Database | CTE Query Size Limit | Physical Tables Threshold |
|---|---|---|
| BigQuery | ~1MB (estimated) | Large dataset or complex CTEs |
| Athena | 256KB | Automatically switches at 256KB |
| Redshift | 16MB | Automatically switches at 16MB |
| Trino | 16MB (estimated) | Large dataset or complex CTEs |
| Snowflake | 1MB | Automatically switches at 1MB |
| DuckDB | 32MB (estimated) | Large dataset or complex CTEs |
# Default: CTE Mode (recommended for most use cases)
@sql_test(mock_tables=[...], result_class=ResultClass)
def test_default_mode():
return TestCase(query="SELECT * FROM table")
# Explicit CTE Mode
@sql_test(mock_tables=[...], result_class=ResultClass)
def test_explicit_cte():
return TestCase(
query="SELECT * FROM table",
use_physical_tables=False # Explicit CTE mode
)
# Explicit Physical Tables Mode
@sql_test(mock_tables=[...], result_class=ResultClass)
def test_physical_tables():
return TestCase(
query="SELECT * FROM table",
use_physical_tables=True # Force physical tables
)
# Physical Tables with Custom Parallel Settings
@sql_test(
mock_tables=[...],
result_class=ResultClass,
use_physical_tables=True,
max_workers=4 # Customize parallel execution
)
def test_with_custom_parallelism():
return TestCase(query="SELECT * FROM table")Notes:
- CTE Mode: Default mode, works with all database engines, suitable for most use cases
- Physical Tables: Used automatically when CTE queries exceed database size limits or when explicitly requested
- Parallel Table Creation: When using physical tables with multiple mock tables, they are created in parallel by default for better performance
- Snowflake: Full support for both CTE and physical table modes
When using use_physical_tables=True with multiple mock tables, the library can create and cleanup tables in parallel for better performance.
Default Behavior:
- Parallel creation is enabled by default when using physical tables
- Smart worker allocation based on table count:
- 1-2 tables: Same number of workers as tables
- 3-5 tables: 3 workers
- 6-10 tables: 5 workers
- 11+ tables: 8 workers (capped)
Customization:
# Disable parallel creation
@sql_test(use_physical_tables=True, parallel_table_creation=False)
# Custom worker count
@sql_test(use_physical_tables=True, max_workers=2)
# In SQLTestCase directly
TestCase(
query="...",
use_physical_tables=True,
parallel_table_creation=True, # Default
max_workers=4 # Custom worker limit
)Default Behavior:
- Parallel cleanup is enabled by default when using physical tables
- Uses the same smart worker allocation as table creation
- Cleanup errors are logged as warnings (best-effort cleanup)
Customization:
# Disable parallel cleanup
@sql_test(use_physical_tables=True, parallel_table_cleanup=False)
# Custom worker count for both creation and cleanup
@sql_test(use_physical_tables=True, max_workers=2)
# In SQLTestCase directly
TestCase(
query="...",
use_physical_tables=True,
parallel_table_creation=True, # Default
parallel_table_cleanup=True, # Default
max_workers=4 # Custom worker limit for both operations
)Performance Benefits:
- Both table creation and cleanup operations are parallelized when multiple tables are involved
- Significantly reduces test execution time for tests with many mock tables
- Particularly beneficial for cloud databases where network latency is a factor
# Install with BigQuery support
pip install sql-testing-library[bigquery]
# Install with Athena support
pip install sql-testing-library[athena]
# Install with Redshift support
pip install sql-testing-library[redshift]
# Install with Trino support
pip install sql-testing-library[trino]
# Install with Snowflake support
pip install sql-testing-library[snowflake]
# Install with DuckDB support
pip install sql-testing-library[duckdb]
# Or install with all database adapters
pip install sql-testing-library[all]# Install base dependencies
poetry install
# Install with specific database support
poetry install --with bigquery
poetry install --with athena
poetry install --with redshift
poetry install --with trino
poetry install --with snowflake
poetry install --with duckdb
# Install with all database adapters and dev tools
poetry install --with bigquery,athena,redshift,trino,snowflake,duckdb,dev- Configure your database in
pytest.ini:
[sql_testing]
adapter = bigquery # Use 'bigquery', 'athena', 'redshift', 'trino', 'snowflake', or 'duckdb'
# BigQuery configuration
[sql_testing.bigquery]
project_id = <my-test-project>
dataset_id = <test_dataset>
credentials_path = <path to credentials json>
# Athena configuration
# [sql_testing.athena]
# database = <test_database>
# s3_output_location = s3://my-athena-results/
# region = us-west-2
# aws_access_key_id = <optional> # Optional: if not using default credentials
# aws_secret_access_key = <optional> # Optional: if not using default credentials
# Redshift configuration
# [sql_testing.redshift]
# host = <redshift-host.example.com>
# database = <test_database>
# user = <redshift_user>
# password = <redshift_password>
# port = <5439> # Optional: default port is 5439
# Trino configuration
# [sql_testing.trino]
# host = <trino-host.example.com>
# port = <8080> # Optional: default port is 8080
# user = <trino_user>
# catalog = <memory> # Optional: default catalog is 'memory'
# schema = <default> # Optional: default schema is 'default'
# http_scheme = <http> # Optional: default is 'http', use 'https' for secure connections
#
# # Authentication configuration (choose one method)
# # For Basic Authentication:
# auth_type = basic
# password = <trino_password>
#
# # For JWT Authentication:
# # auth_type = jwt
# # token = <jwt_token>
# Snowflake configuration
# [sql_testing.snowflake]
# account = <account-identifier>
# user = <snowflake_user>
# database = <test_database>
# schema = <PUBLIC> # Optional: default schema is 'PUBLIC'
# warehouse = <compute_wh> # Required: specify a warehouse
# role = <role_name> # Optional: specify a role
#
# # Authentication (choose one):
# # Option 1: Key-pair authentication (recommended for MFA)
# private_key_path = </path/to/private_key.pem>
# # Or use environment variable SNOWFLAKE_PRIVATE_KEY
#
# # Option 2: Password authentication (for accounts without MFA)
# password = <snowflake_password>
# DuckDB configuration
# [sql_testing.duckdb]
# database = <path/to/database.duckdb> # Optional: defaults to in-memory databaseEach database adapter uses a different concept for organizing tables and queries. Understanding the database context - the minimum qualification needed to uniquely identify a table - is crucial for writing mock tables and queries:
| Adapter | Database Context Format | Components | Mock Table Example | Query Example |
|---|---|---|---|---|
| BigQuery | {project_id}.{dataset_id} |
project + dataset | "test-project.test_dataset" |
SELECT * FROM test-project.test_dataset.users |
| Athena | {database} |
database only | "test_db" |
SELECT * FROM test_db.customers |
| Redshift | {database} |
database only | "test_db" |
SELECT * FROM test_db.orders |
| Snowflake | {database}.{schema} |
database + schema | "test_db.public" |
SELECT * FROM test_db.public.products |
| Trino | {catalog}.{schema} |
catalog + schema | "memory.default" |
SELECT * FROM memory.default.inventory |
| DuckDB | {database} |
database only | "test_db" |
SELECT * FROM test_db.analytics |
-
Mock Tables: Use hardcoded database contexts in your test mock tables. Don't rely on environment variables - this makes tests predictable and consistent.
-
default_namespace Parameter: This parameter serves as a namespace resolution context that qualifies unqualified table names in your SQL queries. When creating TestCase instances, use the same database context format as your mock tables.
-
Query References: Your SQL queries can use either:
- Fully qualified names:
SELECT * FROM test-project.test_dataset.users - Unqualified names:
SELECT * FROM users(qualified usingdefault_namespace)
- Fully qualified names:
-
Case Sensitivity:
- All SQL Adapters: Table name matching is case-insensitive - you can use lowercase contexts like
"test_db.public"even if your SQL usesFROM CUSTOMERS - This follows standard SQL behavior where table names are case-insensitive
- All SQL Adapters: Table name matching is case-insensitive - you can use lowercase contexts like
The default_namespace parameter is not where your SQL executes - it's the namespace prefix used to resolve unqualified table names in your queries.
How it works:
- Query:
SELECT * FROM users JOIN orders ON users.id = orders.user_id - default_namespace:
"test-project.test_dataset" - Resolution:
users→test-project.test_dataset.users,orders→test-project.test_dataset.orders - Requirement: These resolved names must match your mock tables'
get_qualified_name()values
Alternative parameter names under consideration:
default_namespaceâś… (most clear about purpose)table_contextnamespace_prefix
Example showing the difference:
# Option 1: Fully qualified table names (default_namespace not used for resolution)
TestCase(
query="SELECT * FROM test-project.test_dataset.users",
default_namespace="test-project.test_dataset", # For consistency, but not used
)
# Option 2: Unqualified table names (default_namespace used for resolution)
TestCase(
query="SELECT * FROM users", # Unqualified
default_namespace="test-project.test_dataset", # Qualifies to: test-project.test_dataset.users
)# BigQuery Mock Table
class UsersMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test-project.test_dataset" # project.dataset format
def get_table_name(self) -> str:
return "users"
# Athena Mock Table
class CustomerMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db" # database only
def get_table_name(self) -> str:
return "customers"
# Snowflake Mock Table
class ProductsMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db.public" # database.schema format (lowercase)
def get_table_name(self) -> str:
return "products"
# DuckDB Mock Table
class AnalyticsMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db" # database only
def get_table_name(self) -> str:
return "analytics"- Write a test using one of the flexible patterns:
from dataclasses import dataclass
from datetime import date
from pydantic import BaseModel
from sql_testing_library import sql_test, TestCase
from sql_testing_library.mock_table import BaseMockTable
@dataclass
class User:
user_id: int
name: str
email: str
class UserResult(BaseModel):
user_id: int
name: str
class UsersMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "sqltesting_db"
def get_table_name(self) -> str:
return "users"
# Pattern 1: Define all test data in the decorator
@sql_test(
mock_tables=[
UsersMockTable([
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
])
],
result_class=UserResult
)
def test_pattern_1():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db"
)
# Pattern 2: Define all test data in the TestCase
@sql_test() # Empty decorator
def test_pattern_2():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db",
mock_tables=[
UsersMockTable([
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
])
],
result_class=UserResult
)
# Pattern 3: Mix and match between decorator and TestCase
@sql_test(
mock_tables=[
UsersMockTable([
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
])
]
)
def test_pattern_3():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db",
result_class=UserResult
)The library supports struct/record types using Python dataclasses or Pydantic models for Athena, Trino, and BigQuery:
from dataclasses import dataclass
from decimal import Decimal
from pydantic import BaseModel
from sql_testing_library import sql_test, TestCase
from sql_testing_library.mock_table import BaseMockTable
# Define nested structs using dataclasses
@dataclass
class Address:
street: str
city: str
state: str
zip_code: str
@dataclass
class Employee:
id: int
name: str
salary: Decimal
address: Address # Nested struct
is_active: bool = True
# Or use Pydantic models
class AddressPydantic(BaseModel):
street: str
city: str
state: str
zip_code: str
class EmployeeResultPydantic(BaseModel):
id: int
name: str
city: str # Extracted from nested struct
# Mock table with struct data
class EmployeesMockTable(BaseMockTable):
def get_database_name(self) -> str:
return "test_db"
def get_table_name(self) -> str:
return "employees"
# Test with struct types
@sql_test(
adapter_type="athena", # or "trino", "bigquery", or "duckdb"
mock_tables=[
EmployeesMockTable([
Employee(
id=1,
name="Alice Johnson",
salary=Decimal("120000.00"),
address=Address(
street="123 Tech Lane",
city="San Francisco",
state="CA",
zip_code="94105"
),
is_active=True
),
Employee(
id=2,
name="Bob Smith",
salary=Decimal("95000.00"),
address=Address(
street="456 Oak Ave",
city="New York",
state="NY",
zip_code="10001"
),
is_active=False
)
])
],
result_class=EmployeeResultPydantic
)
def test_struct_with_dot_notation():
return TestCase(
query="""
SELECT
id,
name,
address.city as city -- Access nested field with dot notation
FROM employees
WHERE address.state = 'CA' -- Use struct fields in WHERE clause
""",
default_namespace="test_db"
)
# You can also query entire structs
@sql_test(
adapter_type="trino", # or "athena", "bigquery", or "duckdb"
mock_tables=[EmployeesMockTable([...])],
result_class=dict # Returns full struct as dict
)
def test_query_full_struct():
return TestCase(
query="SELECT id, name, address FROM employees",
default_namespace="test_db"
)Struct Type Features:
- Nested Structures: Support for deeply nested structs using dataclasses or Pydantic models
- Dot Notation: Access struct fields using
struct.fieldsyntax in queries - Type Safety: Full type conversion between Python objects and SQL ROW types
- NULL Handling: Proper handling of optional struct fields
- WHERE Clause: Use struct fields in filtering conditions
- List of Structs: Full support for
List[StructType]with array operations
SQL Type Mapping:
- Python dataclass/Pydantic model → SQL
ROW(field1 type1, field2 type2, ...) - Nested structs are fully supported
- All struct values are properly cast to ensure type consistency
- Run with pytest:
# Run all tests
pytest test_users.py
# Run only SQL tests (using the sql_test marker)
pytest -m sql_test
# Exclude SQL tests
pytest -m "not sql_test"
# Run a specific test
pytest test_users.py::test_user_query
# If using Poetry
poetry run pytest test_users.py::test_user_queryIf you encounter the error No [sql_testing] section found in pytest.ini, setup.cfg, or tox.ini, this typically happens when using IDEs like PyCharm, VS Code, or other development environments that run pytest from a different working directory.
Problem: The library looks for configuration files (pytest.ini, setup.cfg, tox.ini) in the current working directory, but IDEs may run tests from a different location.
Set the SQL_TESTING_PROJECT_ROOT environment variable to point to your project root directory:
In PyCharm:
- Go to Run/Debug Configurations
- Select your test configuration
- In Environment variables, add:
- Name:
SQL_TESTING_PROJECT_ROOT - Value:
/path/to/your/project/root(where yourpytest.iniis located)
- Name:
In VS Code:
Add to your .vscode/settings.json:
{
"python.testing.pytestArgs": [
"--rootdir=/path/to/your/project/root"
],
"python.envFile": "${workspaceFolder}/.env"
}Create a .env file in your project root:
SQL_TESTING_PROJECT_ROOT=/path/to/your/project/rootCreate a conftest.py file in your project root directory:
"""
PyTest configuration file to ensure SQL Testing Library can find config
"""
import os
import pytest
def pytest_configure(config):
"""Ensure SQL_TESTING_PROJECT_ROOT is set for IDE compatibility"""
if not os.environ.get('SQL_TESTING_PROJECT_ROOT'):
# Set to current working directory where conftest.py is located
project_root = os.path.dirname(os.path.abspath(__file__))
os.environ['SQL_TESTING_PROJECT_ROOT'] = project_root
print(f"Setting SQL_TESTING_PROJECT_ROOT to: {project_root}")This automatically sets the project root when pytest runs, regardless of the IDE or working directory.
Create a setup.cfg file alongside your pytest.ini:
[tool:pytest]
testpaths = tests
[sql_testing]
adapter = bigquery
[sql_testing.bigquery]
project_id = your-project-id
dataset_id = your_dataset
credentials_path = /path/to/credentials.jsonIn PyCharm:
- Go to Run/Debug Configurations
- Set Working directory to your project root (where
pytest.iniis located)
In VS Code:
Ensure your workspace is opened at the project root level where pytest.ini exists.
To verify your configuration is working, run this Python snippet:
from sql_testing_library._pytest_plugin import SQLTestDecorator
decorator = SQLTestDecorator()
try:
project_root = decorator._get_project_root()
print(f"Project root: {project_root}")
config_parser = decorator._get_config_parser()
print(f"Config sections: {config_parser.sections()}")
if 'sql_testing' in config_parser:
adapter = config_parser.get('sql_testing', 'adapter')
print(f"âś… Configuration found! Adapter: {adapter}")
else:
print("❌ No sql_testing section found")
except Exception as e:
print(f"❌ Error: {e}")PyCharm: Often runs pytest from the project parent directory instead of the project root.
- Solution: Set working directory or use
conftest.py
VS Code: May not respect the pytest.ini location when using the Python extension.
- Solution: Use
.envfile or setpython.testing.pytestArgsin settings
Jupyter Notebooks: Running tests in notebooks may not find configuration files.
- Solution: Set
SQL_TESTING_PROJECT_ROOTenvironment variable in the notebook
Docker/Containers: Configuration files may not be mounted or accessible.
- Solution: Ensure config files are included in your Docker build context and set the environment variable
The library supports flexible ways to configure your tests:
- All Config in Decorator: Define all mock tables and result class in the
@sql_testdecorator, with only query and default_namespace in TestCase. - All Config in TestCase: Use an empty
@sql_test()decorator and define everything in the TestCase return value. - Mix and Match: Specify some parameters in the decorator and others in the TestCase.
- Per-Test Database Adapters: Specify which adapter to use for specific tests.
Important notes:
- Parameters provided in the decorator take precedence over those in TestCase
- Either the decorator or TestCase must provide mock_tables and result_class
The adapter specified in [sql_testing] section acts as the default adapter for all tests. When you don't specify an adapter_type in your test, it uses this default.
[sql_testing]
adapter = snowflake # This becomes the default for all testsYou can override the default adapter for individual tests:
# Use BigQuery adapter for this test
@sql_test(
adapter_type="bigquery",
mock_tables=[...],
result_class=UserResult
)
def test_bigquery_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="sqltesting_db"
)
# Use Athena adapter for this test
@sql_test(
adapter_type="athena",
mock_tables=[...],
result_class=UserResult
)
def test_athena_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use Redshift adapter for this test
@sql_test(
adapter_type="redshift",
mock_tables=[...],
result_class=UserResult
)
def test_redshift_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use Trino adapter for this test
@sql_test(
adapter_type="trino",
mock_tables=[...],
result_class=UserResult
)
def test_trino_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use Snowflake adapter for this test
@sql_test(
adapter_type="snowflake",
mock_tables=[...],
result_class=UserResult
)
def test_snowflake_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)
# Use DuckDB adapter for this test
@sql_test(
adapter_type="duckdb",
mock_tables=[...],
result_class=UserResult
)
def test_duckdb_query():
return TestCase(
query="SELECT user_id, name FROM users WHERE user_id = 1",
default_namespace="test_db"
)The adapter_type parameter will use the configuration from the corresponding section in pytest.ini, such as [sql_testing.bigquery], [sql_testing.athena], [sql_testing.redshift], [sql_testing.trino], [sql_testing.snowflake], or [sql_testing.duckdb].
Default Adapter Behavior:
- If
adapter_typeis not specified in the test, the library uses the adapter from[sql_testing]section'sadaptersetting - If no adapter is specified in the
[sql_testing]section, it defaults to "bigquery" - Each adapter reads its configuration from
[sql_testing.<adapter_name>]section
- Supports Google Cloud BigQuery service
- Uses UNION ALL pattern for CTE creation with complex data types
- Handles authentication via service account or application default credentials
- Supports Amazon Athena service for querying data in S3
- Uses CTAS (CREATE TABLE AS SELECT) for efficient temporary table creation
- Handles large queries by automatically falling back to physical tables
- Supports authentication via AWS credentials or instance profiles
- Supports Amazon Redshift data warehouse service
- Uses CTAS (CREATE TABLE AS SELECT) for efficient temporary table creation
- Takes advantage of Redshift's automatic session-based temporary table cleanup
- Handles large datasets and complex queries with SQL-compliant syntax
- Supports authentication via username and password
- Supports Trino (formerly PrestoSQL) distributed SQL query engine
- Uses CTAS (CREATE TABLE AS SELECT) for efficient temporary table creation
- Provides explicit table cleanup management
- Works with a variety of catalogs and data sources
- Handles large datasets and complex queries with full SQL support
- Supports multiple authentication methods including Basic and JWT
- Supports Snowflake cloud data platform
- Uses CTAS (CREATE TABLE AS SELECT) for efficient temporary table creation
- Creates temporary tables that automatically expire at the end of the session
- Handles large datasets and complex queries with Snowflake's SQL dialect
- Supports authentication via username and password
- Optional support for warehouse, role, and schema specification
- Supports DuckDB embedded analytical database
- Uses CTAS (CREATE TABLE AS SELECT) for efficient temporary table creation
- Fast local database with excellent SQL standards compliance
- Supports both file-based and in-memory databases
- No authentication required - perfect for local development and testing
- Excellent performance for analytical workloads
Default Behavior:
- If adapter_type is not specified in the TestCase or decorator, the library will use the adapter specified in the
[sql_testing]section'sadaptersetting. - If no adapter is specified in the
[sql_testing]section, it defaults to "bigquery". - The library will then look for adapter-specific configuration in the
[sql_testing.<adapter>]section. - If the adapter-specific section doesn't exist, it falls back to using the
[sql_testing]section for backward compatibility.
The project includes a Makefile for common development tasks:
# Install all dependencies
make install
# Run unit tests
make test
# Run linting and type checking
make lint
# Format code
make format
# Run all checks (lint + format check + tests)
make check
# See all available commands
make help| Command | Description |
|---|---|
make install |
Install all dependencies with poetry |
make test |
Run unit tests with coverage |
make test-unit |
Run unit tests (excludes integration tests) |
make test-integration |
Run integration tests (requires DB credentials) |
make test-all |
Run all tests (unit + integration) |
make test-tox |
Run tests across all Python versions (3.9-3.12) |
make lint |
Run ruff and mypy checks |
make format |
Format code with black and ruff |
make check |
Run all checks (lint + format + tests) |
make clean |
Remove build artifacts and cache files |
make build |
Build distribution packages |
make docs |
Build documentation |
The project supports Python 3.9-3.12. You can test across all versions using:
# Using tox (automatically tests all Python versions)
make test-tox
# Or directly with tox
tox
# Test specific Python version
tox -e py39 # Python 3.9
tox -e py310 # Python 3.10
tox -e py311 # Python 3.11
tox -e py312 # Python 3.12The project uses comprehensive tools to ensure code quality:
- Ruff for linting and formatting
- Black for code formatting
- Mypy for static type checking
- Pre-commit hooks for automated checks
To set up the development environment:
-
Install development dependencies:
# Using make make install # Or directly with poetry poetry install --all-extras
-
Set up pre-commit hooks:
./scripts/setup-hooks.sh
This ensures code is automatically formatted, linted, and type-checked on commit.
For more information on code quality standards, see docs/linting.md.
The library includes comprehensive GitHub Actions workflows for automated testing across multiple database platforms:
Automatically runs on every PR and merge to master:
- Unit Tests: Mock-based tests in
tests/(free) - Integration Tests: Real database tests in
tests/integration/(minimal cost) - Cleanup: Automatic resource cleanup
- Real AWS Athena tests with automatic S3 setup and cleanup
- Cost: ~$0.05 per test run
- Setup Guide: Athena CI/CD Setup
Required Setup:
- Secrets:
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_ATHENA_OUTPUT_LOCATION - Variables:
AWS_ATHENA_DATABASE,AWS_REGION(optional)
Validation:
python scripts/validate-athena-setup.py- Real GCP BigQuery tests with dataset creation and cleanup
- Cost: Minimal (within free tier for most use cases)
- Setup Guide: BigQuery CI/CD Setup
Required Setup:
- Secrets:
GCP_SA_KEY,GCP_PROJECT_ID
Validation:
python scripts/validate-bigquery-setup.py- Real AWS Redshift Serverless tests with namespace/workgroup creation
- Cost: ~$0.50-$1.00 per test run (free tier: $300 credit for new accounts)
- Setup Guide: Redshift CI/CD Setup
Required Setup:
- Secrets:
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,REDSHIFT_ADMIN_PASSWORD - Variables:
AWS_REGION,REDSHIFT_NAMESPACE,REDSHIFT_WORKGROUP(optional) - IAM Permissions: Includes EC2 permissions for automatic security group configuration
Validation:
python scripts/validate-redshift-setup.pyManual Testing:
# Create Redshift cluster (automatically configures security groups for connectivity)
python scripts/manage-redshift-cluster.py create
# Get connection details and psql command
python scripts/manage-redshift-cluster.py endpoint
# Run integration tests
poetry run pytest tests/integration/test_redshift_integration.py -v
# Clean up resources (automatically waits for proper deletion order)
python scripts/manage-redshift-cluster.py destroy- Real Trino tests using Docker with Memory connector
- Cost: Free (runs locally with Docker)
- Setup Guide: Trino CI/CD Setup
Required Setup:
- Docker for containerized Trino server
- No additional secrets or variables required
Manual Testing:
# Run integration tests (automatically manages Docker containers)
poetry run pytest tests/integration/test_trino_integration.py -v- Real Snowflake tests using cloud data platform
- Cost: Compute time charges based on warehouse size
- Setup Guide: Snowflake CI/CD Setup
Required Setup:
- Secrets:
SNOWFLAKE_ACCOUNT,SNOWFLAKE_USER,SNOWFLAKE_PASSWORD - Variables:
SNOWFLAKE_DATABASE,SNOWFLAKE_WAREHOUSE,SNOWFLAKE_ROLE(optional)
Validation:
python scripts/validate-snowflake-setup.pyManual Testing:
# Run integration tests
poetry run pytest tests/integration/test_snowflake_integration.py -vThe library automatically:
- Parses SQL to find table references
- Resolves unqualified table names with database context
- Injects mock data via CTEs or temp tables
- Deserializes results to typed Python objects
For detailed usage and configuration options, see the example files included.
SQL Testing Library works seamlessly with Mocksmith for automatic test data generation. Mocksmith can reduce your test setup code by ~70% while providing more realistic test data.
Install mocksmith with: pip install mocksmith[mock,pydantic]
# Without Mocksmith - Manual data creation
customers = []
for i in range(100):
customers.append(Customer(
id=i + 1,
name=f"Customer {i + 1}",
email=f"customer{i + 1}@test.com",
balance=Decimal(str(random.uniform(0, 10000)))
))
# With Mocksmith - Automatic realistic data
from mocksmith import mockable, Varchar, Integer, Money
@mockable
@dataclass
class Customer:
id: Integer()
name: Varchar(100)
email: Varchar(255)
balance: Money()
customers = [Customer.mock() for _ in range(100)]See the Mocksmith Integration Guide and examples for detailed usage patterns.
The library has a few known limitations that are planned to be addressed in future updates:
- Redshift: Struct types are not supported due to lack of native struct/record types (uses SUPER type for JSON)
- Snowflake: Struct types are not supported due to lack of native struct/record types (uses VARIANT type for JSON)
- BigQuery: Does not support nested arrays (arrays of arrays). This is a BigQuery database limitation, not a library limitation. (See TODO in
test_struct_types_integration.py:test_nested_lists)
- Add support for more SQL dialects
- Improve error handling for malformed SQL
- Enhance documentation with more examples
- Python >= 3.9
- sqlglot >= 18.0.0
- pydantic >= 2.0.0
- Database-specific clients:
- google-cloud-bigquery for BigQuery
- boto3 for Athena
- psycopg2-binary for Redshift
- trino for Trino
- snowflake-connector-python for Snowflake
- duckdb for DuckDB