@@ -17,7 +17,6 @@ import (
1717 "context"
1818 "fmt"
1919 "net"
20- "net/url"
2120 "os"
2221 "os/signal"
2322 "strconv"
@@ -56,7 +55,29 @@ func Run(config config.Config) error {
5655 group , ctx := errgroup .WithContext (ctx )
5756
5857 cache := cache .NewSnapshotterCache ()
59- snapshotter , err := initSnapshotter (ctx , config , cache )
58+
59+ var (
60+ monitor * metrics.Monitor
61+ serviceDiscovery * discovery.ServiceDiscovery
62+ )
63+ if config .Snapshotter .Metrics .Enable {
64+ sdHost := config .Snapshotter .Metrics .Host
65+ sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
66+ serviceDiscovery := discovery .NewServiceDiscovery (sdHost , sdPort , cache )
67+ monitor , err := initMetricsProxyMonitor (config .Snapshotter .Metrics .PortRange )
68+ if err != nil {
69+ log .G (ctx ).WithError (err ).Fatal ("failed creating metrics proxy monitor" )
70+ return err
71+ }
72+ group .Go (func () error {
73+ return serviceDiscovery .Serve ()
74+ })
75+ group .Go (func () error {
76+ return monitor .Start ()
77+ })
78+ }
79+
80+ snapshotter , err := initSnapshotter (ctx , config , cache , monitor )
6081 if err != nil {
6182 log .G (ctx ).WithFields (
6283 logrus.Fields {"resolver" : config .Snapshotter .Proxy .Address .Resolver .Type },
@@ -80,15 +101,6 @@ func Run(config config.Config) error {
80101 return err
81102 }
82103
83- var serviceDiscovery * discovery.ServiceDiscovery
84- if config .Snapshotter .Metrics .Enable {
85- sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
86- serviceDiscovery = discovery .NewServiceDiscovery (config .Snapshotter .Metrics .Host , sdPort , cache )
87- group .Go (func () error {
88- return serviceDiscovery .Serve ()
89- })
90- }
91-
92104 group .Go (func () error {
93105 return grpcServer .Serve (listener )
94106 })
@@ -101,10 +113,13 @@ func Run(config config.Config) error {
101113 if err := snapshotter .Close (); err != nil {
102114 log .G (ctx ).WithError (err ).Error ("failed to close snapshotter" )
103115 }
104- if serviceDiscovery != nil {
116+ if config . Snapshotter . Metrics . Enable {
105117 if err := serviceDiscovery .Shutdown (ctx ); err != nil {
106118 log .G (ctx ).WithError (err ).Error ("failed to shutdown service discovery server" )
107119 }
120+ // Senders to this channel would panic if it is closed. However snapshotter.Close() will
121+ // shutdown all metrics proxies and ensure there are no more senders over the channel.
122+ monitor .Stop ()
108123 }
109124 }()
110125
@@ -141,23 +156,19 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
141156const base10 = 10
142157const bits32 = 32
143158
144- func initSnapshotter (ctx context.Context , config config.Config , cache cache.Cache ) (snapshots.Snapshotter , error ) {
159+ func initSnapshotter (ctx context.Context , config config.Config , cache cache.Cache , monitor * metrics. Monitor ) (snapshots.Snapshotter , error ) {
145160 resolver , err := initResolver (config )
146161 if err != nil {
147162 return nil , err
148163 }
149164
150- newProxySnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
165+ newRemoteSnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
151166 r := resolver
152167 response , err := r .Get (namespace )
153168 if err != nil {
154169 return nil , err
155170 }
156- u , err := url .Parse (response .Address )
157- if err != nil {
158- return nil , err
159- }
160- host := u .Hostname ()
171+ host := response .Address
161172 port , err := strconv .ParseUint (response .SnapshotterPort , base10 , bits32 )
162173 if err != nil {
163174 return nil , err
@@ -167,57 +178,48 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
167178 }
168179
169180 var metricsProxy * metrics.Proxy
170- // TODO (ginglis13): port management and lifecycle ties in to overall metrics proxy
171- // server lifecycle. tracked here: https://github.com/firecracker-microvm/firecracker-containerd/issues/607
172- portMap := make (map [int ]bool )
173181 if config .Snapshotter .Metrics .Enable {
174- metricsPort , err := strconv . ParseUint ( response .MetricsPort , base10 , bits32 )
182+ metricsProxy , err = initMetricsProxy ( config , monitor , host , response .MetricsPort , response . Labels )
175183 if err != nil {
176184 return nil , err
177185 }
186+ }
178187
179- metricsDialer := func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
180- return vsock .DialContext (ctx , host , uint32 (metricsPort ), vsock .WithLogger (log .G (ctx )))
181- }
188+ return proxy .NewRemoteSnapshotter (ctx , host , snapshotterDialer , metricsProxy )
189+ }
182190
183- portRange := config . Snapshotter . Metrics . PortRange
184- metricsHost := config . Snapshotter . Metrics . Host
191+ return demux . NewSnapshotter ( cache , newRemoteSnapshotterFunc ), nil
192+ }
185193
186- // Assign a port for metrics proxy server.
187- ports := strings .Split (portRange , "-" )
188- portRangeError := fmt .Errorf ("invalid port range %s" , portRange )
189- if len (ports ) < 2 {
190- return nil , portRangeError
191- }
192- lower , err := strconv .Atoi (ports [0 ])
193- if err != nil {
194- return nil , portRangeError
195- }
196- upper , err := strconv .Atoi (ports [1 ])
197- if err != nil {
198- return nil , portRangeError
199- }
200- port := - 1
201- for p := lower ; p <= upper ; p ++ {
202- if _ , ok := portMap [p ]; ! ok {
203- port = p
204- portMap [p ] = true
205- break
206- }
207- }
208- if port < 0 {
209- return nil , fmt .Errorf ("invalid port: %d" , port )
210- }
194+ func initMetricsProxyMonitor (portRange string ) (* metrics.Monitor , error ) {
195+ ports := strings .Split (portRange , "-" )
196+ portRangeError := fmt .Errorf ("invalid port range %s" , portRange )
197+ if len (ports ) < 2 {
198+ return nil , portRangeError
199+ }
200+ lower , err := strconv .Atoi (ports [0 ])
201+ if err != nil {
202+ return nil , portRangeError
203+ }
204+ upper , err := strconv .Atoi (ports [1 ])
205+ if err != nil {
206+ return nil , portRangeError
207+ }
211208
212- metricsProxy , err = metrics .NewProxy (metricsHost , port , response .Labels , metricsDialer )
213- if err != nil {
214- return nil , err
215- }
209+ return metrics .NewMonitor (lower , upper )
210+ }
216211
217- }
212+ func initMetricsProxy (config config.Config , monitor * metrics.Monitor , host , port string , labels map [string ]string ) (* metrics.Proxy , error ) {
213+ metricsPort , err := strconv .ParseUint (port , base10 , bits32 )
214+ if err != nil {
215+ return nil , err
216+ }
218217
219- return proxy .NewProxySnapshotter (ctx , host , snapshotterDialer , metricsProxy )
218+ metricsDialer := func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
219+ return vsock .DialContext (ctx , host , uint32 (metricsPort ), vsock .WithLogger (log .G (ctx )))
220220 }
221221
222- return demux .NewSnapshotter (cache , newProxySnapshotterFunc ), nil
222+ metricsHost := config .Snapshotter .Metrics .Host
223+
224+ return metrics .NewProxy (metricsHost , monitor , labels , metricsDialer )
223225}
0 commit comments