@@ -21,6 +21,7 @@ import (
2121 "os"
2222 "os/signal"
2323 "strconv"
24+ "strings"
2425 "syscall"
2526
2627 snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
@@ -35,6 +36,8 @@ import (
3536 "github.com/firecracker-microvm/firecracker-containerd/snapshotter/config"
3637 "github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux"
3738 "github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/cache"
39+ "github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/metrics"
40+ "github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/metrics/discovery"
3841 "github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
3942 proxyaddress "github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy/address"
4043)
@@ -52,7 +55,8 @@ func Run(config config.Config) error {
5255
5356 group , ctx := errgroup .WithContext (ctx )
5457
55- snapshotter , err := initSnapshotter (ctx , config )
58+ cache := cache .NewSnapshotterCache ()
59+ snapshotter , err := initSnapshotter (ctx , config , cache )
5660 if err != nil {
5761 log .G (ctx ).WithFields (
5862 logrus.Fields {"resolver" : config .Snapshotter .Proxy .Address .Resolver .Type },
@@ -76,6 +80,15 @@ func Run(config config.Config) error {
7680 return err
7781 }
7882
83+ var serviceDiscovery * discovery.ServiceDiscovery
84+ if config .Snapshotter .Metrics .Enable {
85+ sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
86+ serviceDiscovery = discovery .NewServiceDiscovery (config .Snapshotter .Metrics .Host , sdPort , cache )
87+ group .Go (func () error {
88+ return serviceDiscovery .Serve ()
89+ })
90+ }
91+
7992 group .Go (func () error {
8093 return grpcServer .Serve (listener )
8194 })
@@ -88,6 +101,11 @@ func Run(config config.Config) error {
88101 if err := snapshotter .Close (); err != nil {
89102 log .G (ctx ).WithError (err ).Error ("failed to close snapshotter" )
90103 }
104+ if serviceDiscovery != nil {
105+ if err := serviceDiscovery .Shutdown (ctx ); err != nil {
106+ log .G (ctx ).WithError (err ).Error ("failed to shutdown service discovery server" )
107+ }
108+ }
91109 }()
92110
93111 for {
@@ -123,13 +141,13 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
123141const base10 = 10
124142const bits32 = 32
125143
126- func initSnapshotter (ctx context.Context , config config.Config ) (snapshots.Snapshotter , error ) {
144+ func initSnapshotter (ctx context.Context , config config.Config , cache cache. Cache ) (snapshots.Snapshotter , error ) {
127145 resolver , err := initResolver (config )
128146 if err != nil {
129147 return nil , err
130148 }
131149
132- newProxySnapshotterFunc := func (ctx context.Context , namespace string ) (snapshots. Snapshotter , error ) {
150+ newProxySnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy. RemoteSnapshotter , error ) {
133151 r := resolver
134152 response , err := r .Get (namespace )
135153 if err != nil {
@@ -148,22 +166,58 @@ func initSnapshotter(ctx context.Context, config config.Config) (snapshots.Snaps
148166 return vsock .DialContext (ctx , host , uint32 (port ), vsock .WithLogger (log .G (ctx )))
149167 }
150168
169+ var metricsProxy * metrics.Proxy
170+ // TODO (ginglis13): port management and lifecycle ties in to overall metrics proxy
171+ // server lifecycle. tracked here: https://github.com/firecracker-microvm/firecracker-containerd/issues/607
172+ portMap := make (map [int ]bool )
151173 if config .Snapshotter .Metrics .Enable {
152174 metricsPort , err := strconv .ParseUint (response .MetricsPort , base10 , bits32 )
153175 if err != nil {
154176 return nil , err
155177 }
156178
157- // TODO (ginglis13) metricsDialer func to be defined here using metricsPort. It will dial
158- // the same host but connect via its own port. The metrics proxy will be configured in NewProxySnapshotter
159- // task 2 of https://github.com/firecracker-microvm/firecracker-containerd/issues/602
160- _ = func (ctx context.Context , _ string ) (net.Conn , error ) {
179+ metricsDialer := func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
161180 return vsock .DialContext (ctx , host , uint32 (metricsPort ), vsock .WithLogger (log .G (ctx )))
162181 }
182+
183+ portRange := config .Snapshotter .Metrics .PortRange
184+ metricsHost := config .Snapshotter .Metrics .Host
185+
186+ // Assign a port for metrics proxy server.
187+ ports := strings .Split (portRange , "-" )
188+ portRangeError := fmt .Errorf ("invalid port range %s" , portRange )
189+ if len (ports ) < 2 {
190+ return nil , portRangeError
191+ }
192+ lower , err := strconv .Atoi (ports [0 ])
193+ if err != nil {
194+ return nil , portRangeError
195+ }
196+ upper , err := strconv .Atoi (ports [1 ])
197+ if err != nil {
198+ return nil , portRangeError
199+ }
200+ port := - 1
201+ for p := lower ; p <= upper ; p ++ {
202+ if _ , ok := portMap [p ]; ! ok {
203+ port = p
204+ portMap [p ] = true
205+ break
206+ }
207+ }
208+ if port < 0 {
209+ return nil , fmt .Errorf ("invalid port: %d" , port )
210+ }
211+
212+ metricsProxy , err = metrics .NewProxy (metricsHost , port , response .Labels , metricsDialer )
213+ if err != nil {
214+ return nil , err
215+ }
216+
163217 }
164218
165- return proxy .NewProxySnapshotter (ctx , host , snapshotterDialer )
219+ return proxy .NewProxySnapshotter (ctx , host , snapshotterDialer , metricsProxy )
166220 }
167221
168- return demux .NewSnapshotter (cache . NewSnapshotterCache () , newProxySnapshotterFunc ), nil
222+ return demux .NewSnapshotter (cache , newProxySnapshotterFunc ), nil
169223}
0 commit comments