-
Notifications
You must be signed in to change notification settings - Fork 0
fix: lazy persistence #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev/0.7.1
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the orchestrator's persistence layer to use lazy/periodic persistence instead of immediate persistence, aligning with Apify SDK's useAutoSavedValue behavior. This change enables synchronous tracking updates and improves performance at the cost of potentially outdated values in case of crashes.
Key changes:
- Refactored tracking logic from a monolithic
RunsTrackerto separateCurrentRunTracker,FailedRunHistoryTracker, andRunTrackerclasses - Extracted encryption logic into a dedicated
utils/encryption.tsmodule - Implemented lazy persistence via
Actor.on('persistState')event inEncryptedKeyValueStore
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/utils/encryption.ts |
New module isolating encryption/decryption functions with EncryptionKey interface |
src/utils/key-value-store.ts |
Refactored to EncryptedKeyValueStore class with lazy persistence via persistState event |
src/utils/persist.ts |
Removed - persistence logic absorbed by new key-value-store implementation |
src/tracking/run-tracker.ts |
Main tracker orchestrating current and failed run tracking |
src/tracking/current-run-tracker.ts |
Manages active run state with in-memory updates |
src/tracking/failed-run-history-tracker.ts |
Tracks history of failed runs |
src/tracking/builder.ts |
Factory for constructing tracker instances with appropriate storage backends |
src/utils/context.ts |
Added GlobalContext interface, renamed runsTracker to runTracker |
src/tracker.ts |
Removed - split into multiple tracking modules |
src/index.ts |
Updated to use buildRunTrackerForOrchestrator and new context structure |
src/clients/*.ts |
Updated to use runTracker instead of runsTracker with synchronous updates |
test/utils/encryption.test.ts |
New tests for encryption utilities |
test/utils/key-value-store.test.ts |
Rewritten tests for EncryptedKeyValueStore class |
test/tracking/*.test.ts |
New test files for split tracker components |
test/_helpers/*.ts |
New test helpers for consistent test setup |
test/clients/*.test.ts |
Updated to use new test helpers and runTracker |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /** | ||
| * @deprecated `runTracker` should not be in the global context, because there is one tracker per Apify client. | ||
| * TODO: Remove or replace. | ||
| */ | ||
| export interface OrchestratorContext { | ||
| logger: Logger; | ||
| runsTracker: RunsTracker; | ||
| runTracker: RunTracker; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to get rid of this in the next refactor, when I will change the scheduling logic, and I'll need to touch all the clients that use the OrchestratorContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 35 out of 35 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
halvko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing something, but to me it seems like the storage is needlessly complicated - we've agreed to talk about it tomorrow 😌
src/utils/key-value-store.ts
Outdated
| const value = await this.getValue<T>(key, defaultValue); | ||
|
|
||
| return JSON.parse(Buffer.from(decrypted).toString()) as T; | ||
| } | ||
| // Check the cache again to avoid creating multiple states for the same key in concurrent scenarios. | ||
| cachedValue = this.cache.get(key) as T; | ||
| if (cachedValue) return cachedValue; | ||
|
|
||
| class EncryptedKeyValueStore extends KeyValueStore { | ||
| private cryptSecret: string; | ||
| protected kvStore: KeyValueStore; | ||
| this.cache.set(key, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably prefer to do a lock/signal here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced the double cache check with pending promises in this commit: 8ac9fad
a564749 to
503f05d
Compare
halvko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must admit I went over the test code a little bit quickly, but all looks good to me
| private readonly logger: Logger, | ||
| private readonly encryptionKey: EncryptionKey, | ||
| ) { | ||
| Actor.on('persistState', this.persistCache.bind(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised this works 😅 especially since as far as I can see persistCache doesn't take any arguments?
| if (cachedValue) return cachedValue; | ||
|
|
||
| const pendingOperation = this.pendingOperations.get(key) as Promise<T> | undefined; | ||
| if (pendingOperation) return await pendingOperation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to await the same promise multiple times like this?
| const pendingOperation = this.pendingOperations.get(key) as Promise<T> | undefined; | ||
| if (pendingOperation) return await pendingOperation; | ||
|
|
||
| const operation = this.getValue<T>(key, defaultValue) | ||
| .then((value) => { | ||
| this.cache.set(key, value); | ||
| this.pendingOperations.delete(key); | ||
| return value; | ||
| }) | ||
| .catch((error) => { | ||
| this.pendingOperations.delete(key); | ||
| throw error; | ||
| }); | ||
|
|
||
| this.pendingOperations.set(key, operation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would want to denote that there is a critical section here - any await point will lead to a race condition
| const defaultTrackedRuns = getDefaultTrackedRuns(); | ||
| const storageKey = `${options?.storagePrefix ?? ''}${TRACKED_RUNS_KEY}`; | ||
| const trackedRuns = | ||
| (await options?.storage?.useState<TrackedRuns>(storageKey, defaultTrackedRuns)) ?? defaultTrackedRuns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we are guaranteed that it is the same underlying object which is returned, so even if we create two of these they will always be synchronized...
| return runInfo; | ||
| } | ||
|
|
||
| declareLostRun(runName: string, reason?: string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is some naming here - why do does the function declaring end up deleting the run? But also, are we using the findAndDeleteRun anywhere else, cause otherwise it looks like exactly the same code except the two last lines of this function. I think it's better to merge them (unless I missed a call)
Make storage persistence lazy, instead of aggressive: values will be persisted periodically, instead of immediately on update.
This will make the stored values possibly outdated in case of crash or abrupt abort.
On the other hand, this aligns the behavior with the Apify SDK (
useAutoSavedValuemethod), and allows having synchronous tracking updates, making it easier to extract and isolate tracking logic in a future refactor. It should, presumably, also reduce the number of API calls and improve the performance, with an extra improvement when using encryption, because data is encrypted only when persisted.Refactor:
RunsTracker→RunTracker.OrchestratorContextwas replaced byGlobalContext, but the former will be removed in a future refactor.trackingandstoragetologging: use "build" instead of "generate", and pass the wholeOrchestratorOptionsfor simplicity.Test runs: