-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat(langgraph+lsd): custom encryption at rest #1715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
87dd364
096e36a
0afab9e
03be1c8
afc19e2
c7ee0d8
37fbc1d
6781cbd
35cd9e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,381 @@ | ||||||
| --- | ||||||
| title: Add encryption at-rest | ||||||
| sidebarTitle: Encryption at-rest | ||||||
| --- | ||||||
|
|
||||||
| LangGraph Platform supports encryption at-rest for checkpoint data and metadata. You can choose between basic encryption with a single key or custom encryption for advanced use cases. | ||||||
|
|
||||||
| <Note> | ||||||
| Encryption at-rest is available for LangGraph Platform deployments (Python graphs only). | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: This is in the Deployments section so I would just call out the python piece. Also the API version needed similar to here: https://docs.langchain.com/langsmith/custom-lifespan#how-to-add-custom-lifespan-events
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Or LangSmith deployments?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. willfix |
||||||
| </Note> | ||||||
|
|
||||||
| ## Choosing an encryption method | ||||||
|
|
||||||
| | Method | What's encrypted | Use case | | ||||||
| |--------|------------------|----------| | ||||||
| | **Basic encryption** | Checkpoint blobs only | Single static key, automatic AES encryption | | ||||||
| | **Custom encryption** | Checkpoints + JSON fields | Per-tenant keys, KMS integration, selective field encryption | | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Or something else to show these resources. JSON fields feels too vague.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the "metadata" ambiguity is brutal. i think naming the models, as you've suggested, instead of naming the fields is probably best here, but it's also annoyingly vague. |
||||||
|
|
||||||
| ## Basic encryption | ||||||
|
|
||||||
| For simple encryption with a single static key, set the `LANGGRAPH_AES_KEY` environment variable. LangGraph will automatically encrypt checkpoint blobs using AES. | ||||||
|
|
||||||
| 1. Add `pycryptodome` to your dependencies in `langgraph.json`: | ||||||
| ```json | ||||||
| { | ||||||
| "dependencies": [".", "pycryptodome"], | ||||||
| "graphs": { | ||||||
| "agent": "./agent.py:graph" | ||||||
| } | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| 2. Set the `LANGGRAPH_AES_KEY` environment variable to a 16, 24, or 32-byte key (for AES-128, AES-192, or AES-256 respectively). | ||||||
|
|
||||||
| Basic encryption only encrypts checkpoint blobs. Metadata fields remain unencrypted and searchable. | ||||||
|
|
||||||
| ## Custom encryption | ||||||
|
|
||||||
| Use custom encryption when you need: | ||||||
|
|
||||||
| - **Per-tenant key isolation** — different encryption keys for different customers | ||||||
| - **KMS integration** — AWS KMS, Google Cloud KMS, or HashiCorp Vault for key management, rotation, and audit logging | ||||||
| - **Selective field encryption** — encrypt sensitive metadata fields while keeping others searchable | ||||||
|
|
||||||
| ### How it works | ||||||
|
|
||||||
| 1. [Configure](#configuration) the encryption module path in `langgraph.json` | ||||||
| 2. [Define your encryption module](#defining-your-encryption-module) with handlers for blob and JSON encryption | ||||||
| 3. [Pass encryption context](#passing-encryption-context) (like tenant ID) via the `X-Encryption-Context` header | ||||||
| 4. LangGraph calls your handlers before storing and after retrieving data | ||||||
|
|
||||||
| For production deployments with key rotation and audit logging, see [Envelope encryption with AWS Encryption SDK](#envelope-encryption-with-aws-encryption-sdk). | ||||||
|
|
||||||
| ### Configuration | ||||||
|
|
||||||
| Add your encryption module to `langgraph.json`: | ||||||
|
|
||||||
| ```json | ||||||
| { | ||||||
| "dependencies": ["."], | ||||||
| "graphs": { | ||||||
| "agent": "./agent.py:graph" | ||||||
| }, | ||||||
| "encryption": { | ||||||
| "path": "./encryption.py:encryption" | ||||||
| } | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| <Note> | ||||||
| If you're already using `LANGGRAPH_AES_KEY`, keep it configured—custom encryption replaces AES for new writes, but existing AES-encrypted data will still be readable. | ||||||
| </Note> | ||||||
|
|
||||||
| ### Defining your encryption module | ||||||
|
|
||||||
| #### Blob encryption (checkpoints) | ||||||
|
|
||||||
| Blob handlers encrypt checkpoint data—the serialized state from graph execution. Here's a simplified example using per-tenant keys with [Fernet](https://cryptography.io/en/latest/fernet/) (a symmetric encryption scheme from the `cryptography` library): | ||||||
|
|
||||||
| ```python | ||||||
| import os | ||||||
| from cryptography.fernet import Fernet | ||||||
| from langgraph_sdk import Encryption, EncryptionContext | ||||||
|
|
||||||
| encryption = Encryption() | ||||||
|
|
||||||
| # In production, fetch from a secrets manager | ||||||
| TENANT_KEYS = { | ||||||
| "tenant-a": Fernet(os.environ["TENANT_A_KEY"]), | ||||||
| "tenant-b": Fernet(os.environ["TENANT_B_KEY"]), | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
| def _get_fernet(ctx: EncryptionContext) -> Fernet: | ||||||
| tenant_id = ctx.metadata.get("tenant_id") | ||||||
| if not tenant_id or tenant_id not in TENANT_KEYS: | ||||||
| raise ValueError(f"Unknown tenant: {tenant_id}") | ||||||
| return TENANT_KEYS[tenant_id] | ||||||
|
|
||||||
|
|
||||||
| @encryption.encrypt.blob | ||||||
| async def encrypt_blob(ctx: EncryptionContext, data: bytes) -> bytes: | ||||||
| return _get_fernet(ctx).encrypt(data) | ||||||
|
|
||||||
|
|
||||||
| @encryption.decrypt.blob | ||||||
| async def decrypt_blob(ctx: EncryptionContext, data: bytes) -> bytes: | ||||||
| return _get_fernet(ctx).decrypt(data) | ||||||
| ``` | ||||||
|
|
||||||
| The `ctx.metadata` dict comes from the `X-Encryption-Context` header and is stored alongside encrypted data, so the correct key is used on decryption. | ||||||
|
|
||||||
| #### JSON encryption (metadata) | ||||||
|
|
||||||
| JSON handlers encrypt structured data like thread metadata, assistant context, and run kwargs. Unlike blob encryption, you choose which fields to encrypt—keeping some unencrypted for search and filtering. | ||||||
|
|
||||||
| <Warning> | ||||||
| **Encrypted fields cannot be searched or filtered.** Design your metadata schema so that fields you need to query remain unencrypted. | ||||||
| </Warning> | ||||||
|
|
||||||
| <Note> | ||||||
| **Migration consideration:** Unlike blob encryption, JSON handlers have no built-in way to detect whether a field value is already encrypted. We recommend storing all encrypted values under a single key (e.g., `__encrypted__`)—if the key exists, decrypt it; if not, data is unencrypted and passes through unchanged. The examples below use this pattern. | ||||||
| </Note> | ||||||
|
|
||||||
| Common fields to leave **unencrypted** for search and filtering: | ||||||
|
|
||||||
| - User-defined fields for access control queries (e.g., `tenant_id`, `owner`) | ||||||
| - `run_id`, `thread_id`, `graph_id`, `assistant_id`, `user_id`, `checkpoint_id` — system-populated identifiers | ||||||
| - `source`, `step`, `parents`, `run_attempt` — system-populated execution state | ||||||
| - `langgraph_version`, `langgraph_api_version`, `langgraph_plan`, `langgraph_host`, `langgraph_api_url`, `langgraph_request_id`, `langgraph_auth_user`, `langgraph_auth_user_id`, `langgraph_auth_permissions` — system-populated platform metadata | ||||||
|
|
||||||
| ```python | ||||||
| import json | ||||||
| import os | ||||||
| from cryptography.fernet import Fernet | ||||||
| from langgraph_sdk import Encryption, EncryptionContext | ||||||
|
|
||||||
| encryption = Encryption() | ||||||
|
|
||||||
| TENANT_KEYS = { | ||||||
| "tenant-a": Fernet(os.environ["TENANT_A_KEY"]), | ||||||
| "tenant-b": Fernet(os.environ["TENANT_B_KEY"]), | ||||||
| } | ||||||
|
|
||||||
| SKIP_FIELDS = { | ||||||
| "tenant_id", "owner", | ||||||
| "run_id", "thread_id", "graph_id", "assistant_id", "user_id", "checkpoint_id", | ||||||
| "source", "step", "parents", "run_attempt", | ||||||
| "langgraph_version", "langgraph_api_version", "langgraph_plan", "langgraph_host", | ||||||
| "langgraph_api_url", "langgraph_request_id", "langgraph_auth_user", | ||||||
| "langgraph_auth_user_id", "langgraph_auth_permissions", | ||||||
| } | ||||||
| ENCRYPTED_KEY = "__encrypted__" | ||||||
|
|
||||||
|
|
||||||
| def _get_fernet(ctx: EncryptionContext) -> Fernet: | ||||||
| tenant_id = ctx.metadata.get("tenant_id") | ||||||
| if not tenant_id or tenant_id not in TENANT_KEYS: | ||||||
| raise ValueError(f"Unknown tenant: {tenant_id}") | ||||||
| return TENANT_KEYS[tenant_id] | ||||||
|
|
||||||
|
|
||||||
| @encryption.encrypt.json | ||||||
| async def encrypt_json(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| fernet = _get_fernet(ctx) | ||||||
| to_encrypt = {k: v for k, v in data.items() if k not in SKIP_FIELDS and v is not None} | ||||||
| if not to_encrypt: | ||||||
| return data | ||||||
| result = {k: v for k, v in data.items() if k in SKIP_FIELDS} | ||||||
| result[ENCRYPTED_KEY] = fernet.encrypt(json.dumps(to_encrypt).encode()).decode() | ||||||
| return result | ||||||
|
|
||||||
|
|
||||||
| @encryption.decrypt.json | ||||||
| async def decrypt_json(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| if ENCRYPTED_KEY not in data: | ||||||
| return data # Not encrypted, pass through unchanged | ||||||
| fernet = _get_fernet(ctx) | ||||||
| encrypted_blob = data[ENCRYPTED_KEY] | ||||||
| decrypted = json.loads(fernet.decrypt(encrypted_blob.encode()).decode()) | ||||||
| result = {k: v for k, v in data.items() if k != ENCRYPTED_KEY} | ||||||
| result.update(decrypted) | ||||||
| return result | ||||||
| ``` | ||||||
|
|
||||||
| #### What gets encrypted | ||||||
|
|
||||||
| **JSON handlers** (`@encryption.encrypt.json` / `@encryption.decrypt.json`): | ||||||
|
|
||||||
| - `thread.metadata`, `thread.values` | ||||||
| - `assistant.metadata`, `assistant.context` | ||||||
| - `run.metadata`, `run.kwargs` | ||||||
| - `cron.metadata`, `cron.payload` | ||||||
|
|
||||||
| **Blob handlers** (`@encryption.encrypt.blob` / `@encryption.decrypt.blob`): | ||||||
|
|
||||||
| - Checkpoint blobs (graph execution state) | ||||||
|
|
||||||
| #### Model-specific handlers | ||||||
|
|
||||||
| Register different handlers for different model types using `@encryption.encrypt.json.assistant`, `@encryption.encrypt.json.run`, etc. Use `ctx.field` to vary behavior by field: | ||||||
|
|
||||||
| ```python | ||||||
| from langgraph_sdk import Encryption, EncryptionContext | ||||||
|
|
||||||
| encryption = Encryption() | ||||||
|
|
||||||
| @encryption.encrypt.json | ||||||
| async def default_encrypt(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| return encrypt_with_skip_fields(data, SKIP_FIELDS) | ||||||
|
|
||||||
| @encryption.encrypt.json.assistant | ||||||
| async def encrypt_assistant(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| if ctx.field == "context": | ||||||
| # Assistant context may contain API keys or system prompts—encrypt everything | ||||||
| return encrypt_with_skip_fields(data, skip_fields=set()) | ||||||
| # Assistant metadata—skip system fields | ||||||
| return encrypt_with_skip_fields(data, SKIP_FIELDS) | ||||||
|
|
||||||
| @encryption.decrypt.json | ||||||
| async def default_decrypt(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| return decrypt_encrypted_fields(data) | ||||||
|
|
||||||
| @encryption.decrypt.json.assistant | ||||||
| async def decrypt_assistant(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| return decrypt_encrypted_fields(data) | ||||||
| ``` | ||||||
|
|
||||||
| Supported model types: `thread`, `assistant`, `run`, `cron`, `checkpoint`. | ||||||
|
|
||||||
| #### Deriving context from authentication | ||||||
|
|
||||||
| Instead of passing `X-Encryption-Context` explicitly, derive encryption context from the authenticated user: | ||||||
|
|
||||||
| ```python | ||||||
| from langgraph_sdk import Encryption, EncryptionContext | ||||||
| from starlette.authentication import BaseUser | ||||||
|
|
||||||
| encryption = Encryption() | ||||||
|
|
||||||
| @encryption.context | ||||||
| async def get_encryption_context(user: BaseUser, ctx: EncryptionContext) -> dict: | ||||||
| return { | ||||||
| **ctx.metadata, | ||||||
| "tenant_id": user["tenant_id"], | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| This handler runs once per request after authentication. The returned dict becomes `ctx.metadata` for all encryption operations in that request. | ||||||
|
|
||||||
| ### Passing encryption context | ||||||
|
|
||||||
| Pass encryption context via the `X-Encryption-Context` header. The context is arbitrary data that you define—you control the schema and can include any fields your encryption logic needs (e.g., `tenant_id`, `key_version`). The context is available in your handlers as `ctx.metadata` and is stored alongside encrypted data for use during decryption. | ||||||
|
|
||||||
| ```python | ||||||
| import base64 | ||||||
| import json | ||||||
| from langgraph_sdk import get_client | ||||||
|
|
||||||
| encryption_context = base64.b64encode( | ||||||
| json.dumps({"tenant_id": "tenant-a"}).encode() | ||||||
| ).decode() | ||||||
|
|
||||||
| client = get_client(url="http://localhost:2024") | ||||||
|
|
||||||
| result = await client.runs.wait( | ||||||
| thread_id=None, | ||||||
| assistant_id="agent", | ||||||
| input={"messages": [{"role": "user", "content": "Hello"}]}, | ||||||
| headers={"X-Encryption-Context": encryption_context}, | ||||||
| ) | ||||||
| ``` | ||||||
|
|
||||||
| <Note> | ||||||
| The encryption context is stored with encrypted data. On decryption, it's automatically restored—callers don't need to pass the header when reading. | ||||||
| </Note> | ||||||
|
|
||||||
| ### Envelope encryption with AWS Encryption SDK | ||||||
|
|
||||||
| For production deployments on AWS, use the [AWS Encryption SDK](https://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/python.html) with AWS KMS, or an equivalent within your cloud provider. This approach: | ||||||
|
|
||||||
| - Handles envelope encryption automatically (no manual key packing) | ||||||
| - Provides key rotation and audit logging | ||||||
| - Binds ciphertext to encryption context (tenant isolation) | ||||||
| - Caches data keys locally to avoid repeated KMS calls, latency and rate limits | ||||||
|
|
||||||
| #### Complete example | ||||||
|
|
||||||
| ```python | ||||||
| import base64 | ||||||
| import json | ||||||
| import os | ||||||
|
|
||||||
| import aws_encryption_sdk | ||||||
| from aws_encryption_sdk import ( | ||||||
| CachingCryptoMaterialsManager, | ||||||
| CommitmentPolicy, | ||||||
| LocalCryptoMaterialsCache, | ||||||
| StrictAwsKmsMasterKeyProvider, | ||||||
| ) | ||||||
| from langgraph_sdk import Encryption, EncryptionContext | ||||||
|
|
||||||
| encryption = Encryption() | ||||||
|
|
||||||
| # The SDK uses envelope encryption: one KMS API call generates a data key, | ||||||
| # then encrypts/decrypts locally. The cache reuses data keys across operations. | ||||||
| client = aws_encryption_sdk.EncryptionSDKClient( | ||||||
| commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT | ||||||
| ) | ||||||
| key_provider = StrictAwsKmsMasterKeyProvider(key_ids=[os.environ["KMS_KEY_ARN"]]) | ||||||
| cache = LocalCryptoMaterialsCache(capacity=100) | ||||||
| cmm = CachingCryptoMaterialsManager( | ||||||
| master_key_provider=key_provider, | ||||||
| cache=cache, | ||||||
| max_age=300.0, | ||||||
| max_messages_encrypted=100, | ||||||
| ) | ||||||
|
|
||||||
| SKIP_FIELDS = { | ||||||
| "tenant_id", "owner", | ||||||
| "run_id", "thread_id", "graph_id", "assistant_id", "user_id", "checkpoint_id", | ||||||
| "source", "step", "parents", "run_attempt", | ||||||
| "langgraph_version", "langgraph_api_version", "langgraph_plan", "langgraph_host", | ||||||
| "langgraph_api_url", "langgraph_request_id", "langgraph_auth_user", | ||||||
| "langgraph_auth_user_id", "langgraph_auth_permissions", | ||||||
| } | ||||||
| ENCRYPTED_KEY = "__encrypted__" | ||||||
|
|
||||||
|
|
||||||
| @encryption.encrypt.blob | ||||||
| async def encrypt_blob(ctx: EncryptionContext, data: bytes) -> bytes: | ||||||
| ciphertext, _ = client.encrypt( | ||||||
| source=data, | ||||||
| materials_manager=cmm, | ||||||
| encryption_context={"tenant_id": ctx.metadata["tenant_id"]}, | ||||||
| ) | ||||||
| return ciphertext | ||||||
|
|
||||||
|
|
||||||
| @encryption.decrypt.blob | ||||||
| async def decrypt_blob(ctx: EncryptionContext, data: bytes) -> bytes: | ||||||
| plaintext, _ = client.decrypt(source=data, key_provider=key_provider) | ||||||
| return plaintext | ||||||
|
|
||||||
|
|
||||||
| @encryption.encrypt.json | ||||||
| async def encrypt_json(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| to_encrypt = {k: v for k, v in data.items() if k not in SKIP_FIELDS and v is not None} | ||||||
| if not to_encrypt: | ||||||
| return data | ||||||
| ciphertext, _ = client.encrypt( | ||||||
| source=json.dumps(to_encrypt).encode(), | ||||||
| materials_manager=cmm, | ||||||
| encryption_context={"tenant_id": ctx.metadata["tenant_id"]}, | ||||||
| ) | ||||||
| result = {k: v for k, v in data.items() if k in SKIP_FIELDS} | ||||||
| result[ENCRYPTED_KEY] = base64.b64encode(ciphertext).decode() | ||||||
| return result | ||||||
|
|
||||||
|
|
||||||
| @encryption.decrypt.json | ||||||
| async def decrypt_json(ctx: EncryptionContext, data: dict) -> dict: | ||||||
| if ENCRYPTED_KEY not in data: | ||||||
| return data # Not encrypted, pass through unchanged | ||||||
| ciphertext = base64.b64decode(data[ENCRYPTED_KEY]) | ||||||
| plaintext, _ = client.decrypt(source=ciphertext, key_provider=key_provider) | ||||||
| decrypted = json.loads(plaintext.decode()) | ||||||
| result = {k: v for k, v in data.items() if k != ENCRYPTED_KEY} | ||||||
| result.update(decrypted) | ||||||
| return result | ||||||
| ``` | ||||||
|
|
||||||
| The `encryption_context` is cryptographically bound to the ciphertext via KMS—decryption fails if the context doesn't match. The context is embedded in the ciphertext, so decrypt handlers don't need to reference `ctx.metadata`. | ||||||
|
|
||||||
| #### Key rotation | ||||||
|
|
||||||
| KMS handles master key rotation automatically. When you enable automatic rotation on your KMS key, old encrypted data keys can still be decrypted while new operations use the rotated key material. No re-encryption of existing data is required. | ||||||
|
|
||||||
| ## Related | ||||||
|
|
||||||
| - [Custom authentication](/langsmith/custom-auth) | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.