@@ -18,13 +18,13 @@ package cache
1818
1919import (
2020 "bytes"
21- "errors"
2221 "fmt"
2322 "io"
2423 "os"
2524 "path/filepath"
2625 "sync"
2726
27+ "github.com/containerd/log"
2828 "github.com/containerd/stargz-snapshotter/util/cacheutil"
2929 "github.com/containerd/stargz-snapshotter/util/namedmutex"
3030)
@@ -61,6 +61,9 @@ type DirectoryCacheConfig struct {
6161 // Direct forcefully enables direct mode for all operation in cache.
6262 // Thus operation won't use on-memory caches.
6363 Direct bool
64+
65+ // EnableHardlink enables hardlinking of cache files to reduce memory usage
66+ EnableHardlink bool
6467}
6568
6669// TODO: contents validation.
@@ -99,6 +102,7 @@ type Writer interface {
99102type cacheOpt struct {
100103 direct bool
101104 passThrough bool
105+ chunkDigest string
102106}
103107
104108type Option func (o * cacheOpt ) * cacheOpt
@@ -123,6 +127,14 @@ func PassThrough() Option {
123127 }
124128}
125129
130+ // ChunkDigest option allows specifying a chunk digest for the cache
131+ func ChunkDigest (digest string ) Option {
132+ return func (o * cacheOpt ) * cacheOpt {
133+ o .chunkDigest = digest
134+ return o
135+ }
136+ }
137+
126138func NewDirectoryCache (directory string , config DirectoryCacheConfig ) (BlobCache , error ) {
127139 if ! filepath .IsAbs (directory ) {
128140 return nil , fmt .Errorf ("dir cache path must be an absolute path; got %q" , directory )
@@ -173,8 +185,18 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
173185 wipDirectory : wipdir ,
174186 bufPool : bufPool ,
175187 direct : config .Direct ,
188+ syncAdd : config .SyncAdd ,
176189 }
177- dc .syncAdd = config .SyncAdd
190+
191+ // Initialize hardlink manager if enabled
192+ if config .EnableHardlink {
193+ var err error
194+ dc .hlManager , err = InitializeHardlinkManager (filepath .Dir (filepath .Dir (directory )))
195+ if err != nil {
196+ return nil , fmt .Errorf ("failed to initialize hardlink manager: %w" , err )
197+ }
198+ }
199+
178200 return dc , nil
179201}
180202
@@ -193,6 +215,8 @@ type directoryCache struct {
193215
194216 closed bool
195217 closedMu sync.Mutex
218+
219+ hlManager * HardlinkManager
196220}
197221
198222func (dc * directoryCache ) Get (key string , opts ... Option ) (Reader , error ) {
@@ -205,9 +229,15 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
205229 opt = o (opt )
206230 }
207231
232+ // Try to get from memory cache
208233 if ! dc .direct && ! opt .direct {
209- // Get data from memory
210- if b , done , ok := dc .cache .Get (key ); ok {
234+ // Try memory cache for digest or key
235+ cacheKey := key
236+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
237+ cacheKey = opt .chunkDigest
238+ }
239+
240+ if b , done , ok := dc .cache .Get (cacheKey ); ok {
211241 return & reader {
212242 ReaderAt : bytes .NewReader (b .(* bytes.Buffer ).Bytes ()),
213243 closeFunc : func () error {
@@ -217,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
217247 }, nil
218248 }
219249
220- // Get data from disk. If the file is already opened, use it.
221- if f , done , ok := dc .fileCache .Get (key ); ok {
250+ // Get data from file cache for digest or key
251+ if f , done , ok := dc .fileCache .Get (cacheKey ); ok {
222252 return & reader {
223253 ReaderAt : f .(* os.File ),
224254 closeFunc : func () error {
@@ -229,10 +259,21 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
229259 }
230260 }
231261
262+ // First try regular file path
263+ filepath := BuildCachePath (dc .directory , key )
264+
265+ // Check hardlink manager for existing digest file
266+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
267+ if digestPath , exists := dc .hlManager .ProcessCacheGet (key , opt .chunkDigest , opt .direct ); exists {
268+ log .L .Debugf ("Using existing file for digest %q instead of key %q" , opt .chunkDigest , key )
269+ filepath = digestPath
270+ }
271+ }
272+
232273 // Open the cache file and read the target region
233274 // TODO: If the target cache is write-in-progress, should we wait for the completion
234275 // or simply report the cache miss?
235- file , err := os .Open (dc . cachePath ( key ) )
276+ file , err := os .Open (filepath )
236277 if err != nil {
237278 return nil , fmt .Errorf ("failed to open blob file for %q: %w" , key , err )
238279 }
@@ -261,7 +302,12 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
261302 return & reader {
262303 ReaderAt : file ,
263304 closeFunc : func () error {
264- _ , done , added := dc .fileCache .Add (key , file )
305+ cacheKey := key
306+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
307+ cacheKey = opt .chunkDigest
308+ }
309+
310+ _ , done , added := dc .fileCache .Add (cacheKey , file )
265311 defer done () // Release it immediately. Cleaned up on eviction.
266312 if ! added {
267313 return file .Close () // file already exists in the cache. close it.
@@ -281,81 +327,79 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
281327 opt = o (opt )
282328 }
283329
284- wip , err := dc .wipFile (key )
330+ // If hardlink manager exists and digest is provided, check if a hardlink can be created
331+ if dc .hlManager != nil && opt .chunkDigest != "" {
332+ keyPath := BuildCachePath (dc .directory , key )
333+
334+ // Try to create a hardlink from existing digest file
335+ err := dc .hlManager .ProcessCacheAdd (key , opt .chunkDigest , keyPath )
336+ if err == nil {
337+ // Return a no-op writer since the file already exists
338+ return & writer {
339+ WriteCloser : nopWriteCloser (io .Discard ),
340+ commitFunc : func () error { return nil },
341+ abortFunc : func () error { return nil },
342+ }, nil
343+ }
344+ // If ProcessCacheAdd failed, log the error and continue with normal file creation
345+ log .L .Debugf ("Hardlink creation failed: %v" , err )
346+ }
347+
348+ // Create temporary file
349+ w , err := WipFile (dc .wipDirectory , key )
285350 if err != nil {
286351 return nil , err
287352 }
288- w := & writer {
289- WriteCloser : wip ,
353+
354+ // Create writer
355+ writer := & writer {
356+ WriteCloser : w ,
290357 commitFunc : func () error {
291358 if dc .isClosed () {
292359 return fmt .Errorf ("cache is already closed" )
293360 }
294- // Commit the cache contents
295- c := dc .cachePath (key )
296- if err := os .MkdirAll (filepath .Dir (c ), os .ModePerm ); err != nil {
297- var errs []error
298- if err := os .Remove (wip .Name ()); err != nil {
299- errs = append (errs , err )
361+
362+ // Commit file
363+ targetPath := BuildCachePath (dc .directory , key )
364+ if err := os .MkdirAll (filepath .Dir (targetPath ), 0700 ); err != nil {
365+ return fmt .Errorf ("failed to create cache directory: %w" , err )
366+ }
367+
368+ if err := os .Rename (w .Name (), targetPath ); err != nil {
369+ return fmt .Errorf ("failed to commit cache file: %w" , err )
370+ }
371+
372+ // If hardlink manager exists and digest is provided, register the file
373+ if shouldUseDigestCacheKey (dc .hlManager , opt .chunkDigest ) {
374+ // Register this file as the primary source for this digest
375+ if err := dc .hlManager .RegisterDigestFile (opt .chunkDigest , targetPath ); err != nil {
376+ return fmt .Errorf ("failed to register digest file: %w" , err )
377+ }
378+
379+ // Map key to digest
380+ internalKey := dc .hlManager .GenerateInternalKey (dc .directory , key )
381+ if err := dc .hlManager .MapKeyToDigest (internalKey , opt .chunkDigest ); err != nil {
382+ return fmt .Errorf ("failed to map key to digest: %w" , err )
300383 }
301- errs = append (errs , fmt .Errorf ("failed to create cache directory %q: %w" , c , err ))
302- return errors .Join (errs ... )
303384 }
304- return os .Rename (wip .Name (), c )
385+
386+ return nil
305387 },
306388 abortFunc : func () error {
307- return os .Remove (wip .Name ())
389+ return os .Remove (w .Name ())
308390 },
309391 }
310392
311393 // If "direct" option is specified, do not cache the passed data on memory.
312394 // This option is useful for preventing memory cache from being polluted by data
313395 // that won't be accessed immediately.
314396 if dc .direct || opt .direct {
315- return w , nil
397+ return writer , nil
316398 }
317399
400+ // Create memory cache
318401 b := dc .bufPool .Get ().(* bytes.Buffer )
319- memW := & writer {
320- WriteCloser : nopWriteCloser (io .Writer (b )),
321- commitFunc : func () error {
322- if dc .isClosed () {
323- w .Close ()
324- return fmt .Errorf ("cache is already closed" )
325- }
326- cached , done , added := dc .cache .Add (key , b )
327- if ! added {
328- dc .putBuffer (b ) // already exists in the cache. abort it.
329- }
330- commit := func () error {
331- defer done ()
332- defer w .Close ()
333- n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
334- if err != nil || n != cached .(* bytes.Buffer ).Len () {
335- w .Abort ()
336- return err
337- }
338- return w .Commit ()
339- }
340- if dc .syncAdd {
341- return commit ()
342- }
343- go func () {
344- if err := commit (); err != nil {
345- fmt .Println ("failed to commit to file:" , err )
346- }
347- }()
348- return nil
349- },
350- abortFunc : func () error {
351- defer w .Close ()
352- defer w .Abort ()
353- dc .putBuffer (b ) // abort it.
354- return nil
355- },
356- }
357-
358- return memW , nil
402+ return dc .wrapMemoryWriter (b , writer , key )
359403}
360404
361405func (dc * directoryCache ) putBuffer (b * bytes.Buffer ) {
@@ -380,14 +424,6 @@ func (dc *directoryCache) isClosed() bool {
380424 return closed
381425}
382426
383- func (dc * directoryCache ) cachePath (key string ) string {
384- return filepath .Join (dc .directory , key [:2 ], key )
385- }
386-
387- func (dc * directoryCache ) wipFile (key string ) (* os.File , error ) {
388- return os .CreateTemp (dc .wipDirectory , key + "-*" )
389- }
390-
391427func NewMemoryCache () BlobCache {
392428 return & MemoryCache {
393429 Membuf : map [string ]* bytes.Buffer {},
@@ -463,3 +499,56 @@ func (w *writeCloser) Close() error { return w.closeFunc() }
463499func nopWriteCloser (w io.Writer ) io.WriteCloser {
464500 return & writeCloser {w , func () error { return nil }}
465501}
502+
503+ // wrapMemoryWriter wraps a writer with memory caching
504+ func (dc * directoryCache ) wrapMemoryWriter (b * bytes.Buffer , w * writer , key string ) (Writer , error ) {
505+ return & writer {
506+ WriteCloser : nopWriteCloser (b ),
507+ commitFunc : func () error {
508+ if dc .isClosed () {
509+ w .Close ()
510+ return fmt .Errorf ("cache is already closed" )
511+ }
512+
513+ cached , done , added := dc .cache .Add (key , b )
514+ if ! added {
515+ dc .putBuffer (b )
516+ }
517+
518+ commit := func () error {
519+ defer done ()
520+ defer w .Close ()
521+
522+ n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
523+ if err != nil || n != cached .(* bytes.Buffer ).Len () {
524+ w .Abort ()
525+ return err
526+ }
527+ return w .Commit ()
528+ }
529+
530+ if dc .syncAdd {
531+ return commit ()
532+ }
533+
534+ go func () {
535+ if err := commit (); err != nil {
536+ log .L .Infof ("failed to commit to file: %v" , err )
537+ }
538+ }()
539+ return nil
540+ },
541+ abortFunc : func () error {
542+ defer w .Close ()
543+ defer w .Abort ()
544+ dc .putBuffer (b )
545+ return nil
546+ },
547+ }, nil
548+ }
549+
550+ // shouldUseDigestCacheKey determines whether to use the digest as the cache key.
551+ // Returns true only if the hardlink manager exists, is enabled, and chunkDigest is not empty.
552+ func shouldUseDigestCacheKey (hlManager * HardlinkManager , chunkDigest string ) bool {
553+ return hlManager != nil && hlManager .IsEnabled () && chunkDigest != ""
554+ }
0 commit comments