@@ -55,7 +55,29 @@ func Run(config config.Config) error {
5555 group , ctx := errgroup .WithContext (ctx )
5656
5757 cache := cache .NewSnapshotterCache ()
58- snapshotter , err := initSnapshotter (ctx , config , cache )
58+
59+ var (
60+ monitor * metrics.Monitor
61+ serviceDiscovery * discovery.ServiceDiscovery
62+ )
63+ if config .Snapshotter .Metrics .Enable {
64+ sdHost := config .Snapshotter .Metrics .Host
65+ sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
66+ serviceDiscovery := discovery .NewServiceDiscovery (sdHost , sdPort , cache )
67+ monitor , err := initMetricsProxyMonitor (config .Snapshotter .Metrics .PortRange )
68+ if err != nil {
69+ log .G (ctx ).WithError (err ).Fatal ("failed creating metrics proxy monitor" )
70+ return err
71+ }
72+ group .Go (func () error {
73+ return serviceDiscovery .Serve ()
74+ })
75+ group .Go (func () error {
76+ return monitor .Start ()
77+ })
78+ }
79+
80+ snapshotter , err := initSnapshotter (ctx , config , cache , monitor )
5981 if err != nil {
6082 log .G (ctx ).WithFields (
6183 logrus.Fields {"resolver" : config .Snapshotter .Proxy .Address .Resolver .Type },
@@ -79,15 +101,6 @@ func Run(config config.Config) error {
79101 return err
80102 }
81103
82- var serviceDiscovery * discovery.ServiceDiscovery
83- if config .Snapshotter .Metrics .Enable {
84- sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
85- serviceDiscovery = discovery .NewServiceDiscovery (config .Snapshotter .Metrics .Host , sdPort , cache )
86- group .Go (func () error {
87- return serviceDiscovery .Serve ()
88- })
89- }
90-
91104 group .Go (func () error {
92105 return grpcServer .Serve (listener )
93106 })
@@ -100,10 +113,13 @@ func Run(config config.Config) error {
100113 if err := snapshotter .Close (); err != nil {
101114 log .G (ctx ).WithError (err ).Error ("failed to close snapshotter" )
102115 }
103- if serviceDiscovery != nil {
116+ if config . Snapshotter . Metrics . Enable {
104117 if err := serviceDiscovery .Shutdown (ctx ); err != nil {
105118 log .G (ctx ).WithError (err ).Error ("failed to shutdown service discovery server" )
106119 }
120+ // Senders to this channel would panic if it is closed. However snapshotter.Close() will
121+ // shutdown all metrics proxies and ensure there are no more senders over the channel.
122+ monitor .Stop ()
107123 }
108124 }()
109125
@@ -140,13 +156,13 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
140156const base10 = 10
141157const bits32 = 32
142158
143- func initSnapshotter (ctx context.Context , config config.Config , cache cache.Cache ) (snapshots.Snapshotter , error ) {
159+ func initSnapshotter (ctx context.Context , config config.Config , cache cache.Cache , monitor * metrics. Monitor ) (snapshots.Snapshotter , error ) {
144160 resolver , err := initResolver (config )
145161 if err != nil {
146162 return nil , err
147163 }
148164
149- newProxySnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
165+ newRemoteSnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
150166 r := resolver
151167 response , err := r .Get (namespace )
152168 if err != nil {
@@ -162,57 +178,48 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
162178 }
163179
164180 var metricsProxy * metrics.Proxy
165- // TODO (ginglis13): port management and lifecycle ties in to overall metrics proxy
166- // server lifecycle. tracked here: https://github.com/firecracker-microvm/firecracker-containerd/issues/607
167- portMap := make (map [int ]bool )
168181 if config .Snapshotter .Metrics .Enable {
169- metricsPort , err := strconv . ParseUint ( response .MetricsPort , base10 , bits32 )
182+ metricsProxy , err = initMetricsProxy ( config , monitor , host , response .MetricsPort , response . Labels )
170183 if err != nil {
171184 return nil , err
172185 }
186+ }
173187
174- metricsDialer := func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
175- return vsock .DialContext (ctx , host , uint32 (metricsPort ), vsock .WithLogger (log .G (ctx )))
176- }
188+ return proxy .NewRemoteSnapshotter (ctx , host , snapshotterDialer , metricsProxy )
189+ }
177190
178- portRange := config . Snapshotter . Metrics . PortRange
179- metricsHost := config . Snapshotter . Metrics . Host
191+ return demux . NewSnapshotter ( cache , newRemoteSnapshotterFunc ), nil
192+ }
180193
181- // Assign a port for metrics proxy server.
182- ports := strings .Split (portRange , "-" )
183- portRangeError := fmt .Errorf ("invalid port range %s" , portRange )
184- if len (ports ) < 2 {
185- return nil , portRangeError
186- }
187- lower , err := strconv .Atoi (ports [0 ])
188- if err != nil {
189- return nil , portRangeError
190- }
191- upper , err := strconv .Atoi (ports [1 ])
192- if err != nil {
193- return nil , portRangeError
194- }
195- port := - 1
196- for p := lower ; p <= upper ; p ++ {
197- if _ , ok := portMap [p ]; ! ok {
198- port = p
199- portMap [p ] = true
200- break
201- }
202- }
203- if port < 0 {
204- return nil , fmt .Errorf ("invalid port: %d" , port )
205- }
194+ func initMetricsProxyMonitor (portRange string ) (* metrics.Monitor , error ) {
195+ ports := strings .Split (portRange , "-" )
196+ portRangeError := fmt .Errorf ("invalid port range %s" , portRange )
197+ if len (ports ) < 2 {
198+ return nil , portRangeError
199+ }
200+ lower , err := strconv .Atoi (ports [0 ])
201+ if err != nil {
202+ return nil , portRangeError
203+ }
204+ upper , err := strconv .Atoi (ports [1 ])
205+ if err != nil {
206+ return nil , portRangeError
207+ }
206208
207- metricsProxy , err = metrics .NewProxy (metricsHost , port , response .Labels , metricsDialer )
208- if err != nil {
209- return nil , err
210- }
209+ return metrics .NewMonitor (lower , upper )
210+ }
211211
212- }
212+ func initMetricsProxy (config config.Config , monitor * metrics.Monitor , host , port string , labels map [string ]string ) (* metrics.Proxy , error ) {
213+ metricsPort , err := strconv .ParseUint (port , base10 , bits32 )
214+ if err != nil {
215+ return nil , err
216+ }
213217
214- return proxy .NewProxySnapshotter (ctx , host , snapshotterDialer , metricsProxy )
218+ metricsDialer := func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
219+ return vsock .DialContext (ctx , host , uint32 (metricsPort ), vsock .WithLogger (log .G (ctx )))
215220 }
216221
217- return demux .NewSnapshotter (cache , newProxySnapshotterFunc ), nil
222+ metricsHost := config .Snapshotter .Metrics .Host
223+
224+ return metrics .NewProxy (metricsHost , monitor , labels , metricsDialer )
218225}
0 commit comments