Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit f1b8151

Browse files
committed
Switch to otel
1 parent 1da338e commit f1b8151

File tree

2 files changed

+57
-57
lines changed

2 files changed

+57
-57
lines changed

pkg/phlaredb/query/iters.go

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
"sync"
1111

1212
"github.com/grafana/dskit/multierror"
13-
"github.com/opentracing/opentracing-go"
14-
"github.com/opentracing/opentracing-go/log"
1513
"github.com/segmentio/parquet-go"
14+
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/attribute"
16+
"go.opentelemetry.io/otel/trace"
1617

1718
"github.com/grafana/phlare/pkg/iter"
1819
)
@@ -39,6 +40,10 @@ type RowNumberWithDefinitionLevel struct {
3940
DefinitionLevel int
4041
}
4142

43+
func (r *RowNumberWithDefinitionLevel) String() string {
44+
return fmt.Sprintf("%v:%v", r.RowNumber, r.DefinitionLevel)
45+
}
46+
4247
// EmptyRowNumber creates an empty invalid row number.
4348
func EmptyRowNumber() RowNumber {
4449
return RowNumber{-1, -1, -1, -1, -1, -1}
@@ -782,7 +787,7 @@ type SyncIterator struct {
782787
filter *InstrumentedPredicate
783788

784789
// Status
785-
span opentracing.Span
790+
span trace.Span
786791
metrics *Metrics
787792
curr RowNumber
788793
currRowGroup parquet.RowGroup
@@ -839,10 +844,12 @@ func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, co
839844
rn.Skip(rg.NumRows())
840845
}
841846

842-
span, _ := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{
843-
"columnIndex": column,
844-
"column": columnName,
845-
})
847+
tr := otel.Tracer("query")
848+
849+
ctx, span := tr.Start(ctx, "syncIterator", trace.WithAttributes(
850+
attribute.String("column", columnName),
851+
attribute.Int("columnIndex", column),
852+
))
846853

847854
return &SyncIterator{
848855
table: strings.ToLower(rgs[0].Schema().Name()) + "s",
@@ -1012,11 +1019,12 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo
10121019
return true, err
10131020
}
10141021
c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1)
1015-
c.span.LogFields(
1016-
log.String("msg", "reading page (seekPages)"),
1017-
log.Int64("page_num_values", pg.NumValues()),
1018-
log.Int64("page_size", pg.Size()),
1019-
)
1022+
c.span.AddEvent(
1023+
"read page (seekPages)",
1024+
trace.WithAttributes(
1025+
attribute.Int64("page_num_values", pg.NumValues()),
1026+
attribute.Int64("page_size", pg.Size()),
1027+
))
10201028

10211029
// Skip based on row number?
10221030
newRN := c.curr
@@ -1073,11 +1081,12 @@ func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) {
10731081
return EmptyRowNumber(), nil, err
10741082
}
10751083
c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1)
1076-
c.span.LogFields(
1077-
log.String("msg", "reading page (next)"),
1078-
log.Int64("page_num_values", pg.NumValues()),
1079-
log.Int64("page_size", pg.Size()),
1080-
)
1084+
c.span.AddEvent(
1085+
"read page (next)",
1086+
trace.WithAttributes(
1087+
attribute.Int64("page_num_values", pg.NumValues()),
1088+
attribute.Int64("page_size", pg.Size()),
1089+
))
10811090

10821091
if c.filter != nil && !c.filter.KeepPage(pg) {
10831092
// This page filtered out
@@ -1195,12 +1204,14 @@ func (c *SyncIterator) Err() error {
11951204
func (c *SyncIterator) Close() error {
11961205
c.closeCurrRowGroup()
11971206

1198-
c.span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load())
1199-
c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load())
1200-
c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load())
1201-
c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load())
1202-
c.span.SetTag("keptPages", c.filter.KeptPages.Load())
1203-
c.span.SetTag("keptValues", c.filter.KeptValues.Load())
1204-
c.span.Finish()
1207+
c.span.SetAttributes(attribute.Int64("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()))
1208+
/*
1209+
c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load())
1210+
c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load())
1211+
c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load())
1212+
c.span.SetTag("keptPages", c.filter.KeptPages.Load())
1213+
c.span.SetTag("keptValues", c.filter.KeptValues.Load())
1214+
*/
1215+
c.span.End()
12051216
return nil
12061217
}

pkg/phlaredb/query/iters_test.go

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -348,14 +348,18 @@ func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File
348348
require.NoError(t, err)
349349
t.Logf("Created temp file %s", f.Name())
350350

351-
half := len(rows) / rowGroups
351+
perRG := len(rows) / rowGroups
352352

353353
w := parquet.NewGenericWriter[T](f)
354-
_, err = w.Write(rows[0:half])
355-
require.NoError(t, err)
356-
require.NoError(t, w.Flush())
357354

358-
_, err = w.Write(rows[half:])
355+
for i := 0; i < (rowGroups - 1); i++ {
356+
_, err = w.Write(rows[0:perRG])
357+
require.NoError(t, err)
358+
require.NoError(t, w.Flush())
359+
rows = rows[perRG:]
360+
}
361+
362+
_, err = w.Write(rows)
359363
require.NoError(t, err)
360364
require.NoError(t, w.Flush())
361365

@@ -380,18 +384,15 @@ type iteratorTracer struct {
380384

381385
func (i iteratorTracer) Next() bool {
382386
i.nextCount++
383-
//posBefore := i.it.At()
387+
posBefore := i.it.At()
384388
result := i.it.Next()
385-
//posAfter := i.it.At()
386-
/*
387-
i.span.AddAttributes.LogKV(
388-
"event", "next",
389-
"result", result,
390-
"column", i.name,
391-
"posBefore", posBefore,
392-
"posAfter", posAfter,
393-
)
394-
*/
389+
posAfter := i.it.At()
390+
i.span.AddEvent("next", trace.WithAttributes(
391+
attribute.String("column", i.name),
392+
attribute.Bool("result", result),
393+
attribute.Stringer("posBefore", posBefore),
394+
attribute.Stringer("posAfter", posAfter),
395+
))
395396
return result
396397
}
397398

@@ -412,22 +413,13 @@ func (i iteratorTracer) Seek(pos RowNumberWithDefinitionLevel) bool {
412413
posBefore := i.it.At()
413414
result := i.it.Seek(pos)
414415
posAfter := i.it.At()
415-
416416
i.span.AddEvent("seek", trace.WithAttributes(
417417
attribute.String("column", i.name),
418+
attribute.Bool("result", result),
419+
attribute.Stringer("seekTo", &pos),
418420
attribute.Stringer("posBefore", posBefore),
419421
attribute.Stringer("posAfter", posAfter),
420422
))
421-
/*
422-
i.span.LogKV(
423-
"event", "seek",
424-
"result", result,
425-
"column", i.name,
426-
"seekTo", pos,
427-
"posBefore", posBefore,
428-
"posAfter", posAfter,
429-
)
430-
*/
431423
return result
432424
}
433425

@@ -544,12 +536,9 @@ func TestBinaryJoinIterator(t *testing.T) {
544536

545537
results := 0
546538
for it.Next() {
547-
/*
548-
span.LogKV(
549-
"event", "match",
550-
"pos", it.At().RowNumber,
551-
)
552-
*/
539+
span.AddEvent("match", trace.WithAttributes(
540+
attribute.Stringer("element", it.At()),
541+
))
553542
results++
554543
}
555544
require.NoError(t, it.Err())

0 commit comments

Comments
 (0)