Skip to content

Commit a1a4191

Browse files
committed
more work on the aggregators
1 parent cfc5174 commit a1a4191

File tree

1 file changed

+100
-51
lines changed

1 file changed

+100
-51
lines changed

internal/routing/aggregator.go

Lines changed: 100 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ func (a *AllSucceededAggregator) BatchAdd(results map[string]AggregatorResErr) e
117117
return nil
118118
}
119119

120-
func (a *AllSucceededAggregator) BatchSlice(result []AggregatorResErr) error {
121-
for _, res := range result {
120+
func (a *AllSucceededAggregator) BatchSlice(results []AggregatorResErr) error {
121+
for _, res := range results {
122122
err := a.Add(res.Result, res.Err)
123123
if err != nil {
124124
return err
@@ -185,8 +185,8 @@ func (a *OneSucceededAggregator) AddWithKey(key string, result interface{}, err
185185
return a.Add(result, err)
186186
}
187187

188-
func (a *OneSucceededAggregator) BatchSlice(result []AggregatorResErr) error {
189-
for _, res := range result {
188+
func (a *OneSucceededAggregator) BatchSlice(results []AggregatorResErr) error {
189+
for _, res := range results {
190190
err := a.Add(res.Result, res.Err)
191191
if err != nil {
192192
return err
@@ -202,7 +202,7 @@ func (a *OneSucceededAggregator) BatchSlice(result []AggregatorResErr) error {
202202

203203
func (a *OneSucceededAggregator) Result() (interface{}, error) {
204204
res, e := a.res.Load(), a.err.Load()
205-
if res != nil {
205+
if res == nil {
206206
return nil, e.(error)
207207
}
208208

@@ -223,6 +223,7 @@ func (a *AggSumAggregator) Add(result interface{}, err error) error {
223223
if result != nil {
224224
val, err := toInt64(result)
225225
if err != nil {
226+
a.err.CompareAndSwap(nil, err)
226227
return err
227228
}
228229
atomic.AddInt64(a.res, val)
@@ -232,37 +233,49 @@ func (a *AggSumAggregator) Add(result interface{}, err error) error {
232233
}
233234

234235
func (a *AggSumAggregator) BatchAdd(results map[string]AggregatorResErr) error {
236+
var sum int64
237+
235238
for _, res := range results {
236-
err := a.Add(res.Result, res.Err)
237-
if err != nil {
238-
return err
239+
if res.Err != nil {
240+
a.Add(res.Result, res.Err)
241+
return nil
239242
}
240243

241-
if res.Err != nil {
244+
intRes, err := toInt64(res)
245+
if err != nil {
246+
a.Add(nil, err)
242247
return nil
243248
}
249+
250+
sum += intRes
244251
}
245252

246-
return nil
253+
return a.Add(sum, nil)
247254
}
248255

249256
func (a *AggSumAggregator) AddWithKey(key string, result interface{}, err error) error {
250257
return a.Add(result, err)
251258
}
252259

253-
func (a *AggSumAggregator) BatchSlice(result []AggregatorResErr) error {
254-
for _, res := range result {
255-
err := a.Add(res.Result, res.Err)
256-
if err != nil {
257-
return err
258-
}
260+
func (a *AggSumAggregator) BatchSlice(results []AggregatorResErr) error {
261+
var sum int64
259262

263+
for _, res := range results {
260264
if res.Err != nil {
265+
a.Add(res.Result, res.Err)
261266
return nil
262267
}
268+
269+
intRes, err := toInt64(res)
270+
if err != nil {
271+
a.Add(nil, err)
272+
return nil
273+
}
274+
275+
sum += intRes
263276
}
264277

265-
return nil
278+
return a.Add(sum, nil)
266279
}
267280

268281
func (a *AggSumAggregator) Result() (interface{}, error) {
@@ -298,37 +311,55 @@ func (a *AggMinAggregator) Add(result interface{}, err error) error {
298311
}
299312

300313
func (a *AggMinAggregator) BatchAdd(results map[string]AggregatorResErr) error {
314+
min := int64(math.MaxInt64)
315+
301316
for _, res := range results {
302-
err := a.Add(res.Result, res.Err)
303-
if err != nil {
304-
return err
317+
if res.Err != nil {
318+
_ = a.Add(nil, res.Err)
319+
return nil
305320
}
306321

307-
if res.Err != nil {
322+
resInt, err := toInt64(res.Result)
323+
if err != nil {
324+
_ = a.Add(nil, res.Err)
308325
return nil
309326
}
327+
328+
if resInt < min {
329+
min = resInt
330+
}
331+
310332
}
311333

312-
return nil
334+
return a.Add(min, nil)
313335
}
314336

315337
func (a *AggMinAggregator) AddWithKey(key string, result interface{}, err error) error {
316338
return a.Add(result, err)
317339
}
318340

319-
func (a *AggMinAggregator) BatchSlice(result []AggregatorResErr) error {
320-
for _, res := range result {
321-
err := a.Add(res.Result, res.Err)
322-
if err != nil {
323-
return err
324-
}
341+
func (a *AggMinAggregator) BatchSlice(results []AggregatorResErr) error {
342+
min := int64(math.MaxInt64)
325343

344+
for _, res := range results {
326345
if res.Err != nil {
346+
_ = a.Add(nil, res.Err)
347+
return nil
348+
}
349+
350+
resInt, err := toInt64(res.Result)
351+
if err != nil {
352+
_ = a.Add(nil, res.Err)
327353
return nil
328354
}
355+
356+
if resInt < min {
357+
min = resInt
358+
}
359+
329360
}
330361

331-
return nil
362+
return a.Add(min, nil)
332363
}
333364

334365
func (a *AggMinAggregator) Result() (interface{}, error) {
@@ -368,37 +399,55 @@ func (a *AggMaxAggregator) Add(result interface{}, err error) error {
368399
}
369400

370401
func (a *AggMaxAggregator) BatchAdd(results map[string]AggregatorResErr) error {
402+
max := int64(math.MinInt64)
403+
371404
for _, res := range results {
372-
err := a.Add(res.Result, res.Err)
373-
if err != nil {
374-
return err
405+
if res.Err != nil {
406+
_ = a.Add(nil, res.Err)
407+
return nil
375408
}
376409

377-
if res.Err != nil {
410+
resInt, err := toInt64(res.Result)
411+
if err != nil {
412+
_ = a.Add(nil, res.Err)
378413
return nil
379414
}
415+
416+
if resInt > max {
417+
max = resInt
418+
}
419+
380420
}
381421

382-
return nil
422+
return a.Add(max, nil)
383423
}
384424

385425
func (a *AggMaxAggregator) AddWithKey(key string, result interface{}, err error) error {
386426
return a.Add(result, err)
387427
}
388428

389-
func (a *AggMaxAggregator) BatchSlice(result []AggregatorResErr) error {
390-
for _, res := range result {
391-
err := a.Add(res.Result, res.Err)
392-
if err != nil {
393-
return err
394-
}
429+
func (a *AggMaxAggregator) BatchSlice(results []AggregatorResErr) error {
430+
max := int64(math.MinInt64)
395431

432+
for _, res := range results {
396433
if res.Err != nil {
434+
_ = a.Add(nil, res.Err)
435+
return nil
436+
}
437+
438+
resInt, err := toInt64(res.Result)
439+
if err != nil {
440+
_ = a.Add(nil, res.Err)
397441
return nil
398442
}
443+
444+
if resInt > max {
445+
max = resInt
446+
}
447+
399448
}
400449

401-
return nil
450+
return a.Add(max, nil)
402451
}
403452

404453
func (a *AggMaxAggregator) Result() (interface{}, error) {
@@ -463,8 +512,8 @@ func (a *AggLogicalAndAggregator) AddWithKey(key string, result interface{}, err
463512
return a.Add(result, err)
464513
}
465514

466-
func (a *AggLogicalAndAggregator) BatchSlice(result []AggregatorResErr) error {
467-
for _, res := range result {
515+
func (a *AggLogicalAndAggregator) BatchSlice(results []AggregatorResErr) error {
516+
for _, res := range results {
468517
err := a.Add(res.Result, res.Err)
469518
if err != nil {
470519
return err
@@ -539,8 +588,8 @@ func (a *AggLogicalOrAggregator) AddWithKey(key string, result interface{}, err
539588
return a.Add(result, err)
540589
}
541590

542-
func (a *AggLogicalOrAggregator) BatchSlice(result []AggregatorResErr) error {
543-
for _, res := range result {
591+
func (a *AggLogicalOrAggregator) BatchSlice(results []AggregatorResErr) error {
592+
for _, res := range results {
544593
err := a.Add(res.Result, res.Err)
545594
if err != nil {
546595
return err
@@ -650,11 +699,11 @@ func (a *DefaultKeylessAggregator) AddWithKey(key string, result interface{}, er
650699
return a.Add(result, err)
651700
}
652701

653-
func (a *DefaultKeylessAggregator) BatchSlice(result []AggregatorResErr) error {
702+
func (a *DefaultKeylessAggregator) BatchSlice(results []AggregatorResErr) error {
654703
a.mu.Lock()
655704
defer a.mu.Unlock()
656705

657-
for _, res := range result {
706+
for _, res := range results {
658707
err := a.add(res.Result, res.Err)
659708
if err != nil {
660709
return err
@@ -773,11 +822,11 @@ func (a *DefaultKeyedAggregator) SetKeyOrder(keyOrder []string) {
773822
a.keyOrder = keyOrder
774823
}
775824

776-
func (a *DefaultKeyedAggregator) BatchSlice(result []AggregatorResErr) error {
825+
func (a *DefaultKeyedAggregator) BatchSlice(results []AggregatorResErr) error {
777826
a.mu.Lock()
778827
defer a.mu.Unlock()
779828

780-
for _, res := range result {
829+
for _, res := range results {
781830
err := a.add(res.Result, res.Err)
782831
if err != nil {
783832
return err
@@ -861,11 +910,11 @@ func (a *SpecialAggregator) AddWithKey(key string, result interface{}, err error
861910
return a.Add(result, err)
862911
}
863912

864-
func (a *SpecialAggregator) BatchSlice(result []AggregatorResErr) error {
913+
func (a *SpecialAggregator) BatchSlice(results []AggregatorResErr) error {
865914
a.mu.Lock()
866915
defer a.mu.Unlock()
867916

868-
for _, res := range result {
917+
for _, res := range results {
869918
err := a.add(res.Result, res.Err)
870919
if err != nil {
871920
return err

0 commit comments

Comments
 (0)