Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dbos/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func (c *conductor) handleMessage(data []byte) error {
return c.handleExistPendingWorkflowsRequest(data, base.RequestID)
case retentionMessage:
return c.handleRetentionRequest(data, base.RequestID)
case getMetricsMessage:
return c.handleGetMetricsRequest(data, base.RequestID)
default:
c.logger.Warn("Unknown message type", "type", base.Type)
return c.handleUnknownMessageType(base.RequestID, base.Type, "Unknown message type")
Expand Down Expand Up @@ -374,6 +376,8 @@ func (c *conductor) handleExecutorInfoRequest(data []byte, requestID string) err
ExecutorID: c.dbosCtx.GetExecutorID(),
ApplicationVersion: c.dbosCtx.GetApplicationVersion(),
Hostname: &hostname,
DBOSVersion: getDBOSVersion(),
Language: "go",
}

return c.sendResponse(response, string(executorInfo))
Expand Down Expand Up @@ -551,6 +555,49 @@ func (c *conductor) handleRetentionRequest(data []byte, requestID string) error
return c.sendResponse(response, string(retentionMessage))
}

func (c *conductor) handleGetMetricsRequest(data []byte, requestID string) error {
var req getMetricsConductorRequest
if err := json.Unmarshal(data, &req); err != nil {
c.logger.Error("Failed to parse get metrics request", "error", err)
return fmt.Errorf("failed to parse get metrics request: %w", err)
}
c.logger.Debug("Handling get metrics request",
"start_time", req.StartTime,
"end_time", req.EndTime,
"metric_class", req.MetricClass,
"request_id", requestID)

var errorMsg *string
var metricsData []metricData

if req.MetricClass == "workflow_step_count" {
var err error
metricsData, err = c.dbosCtx.systemDB.getMetrics(c.dbosCtx, req.StartTime, req.EndTime)
if err != nil {
c.logger.Error("Failed to get metrics", "error", err)
errStr := fmt.Sprintf("Exception encountered when getting metrics: %v", err)
errorMsg = &errStr
}
} else {
errStr := fmt.Sprintf("Unexpected metric class: %s", req.MetricClass)
errorMsg = &errStr
c.logger.Warn("Unexpected metric class", "metric_class", req.MetricClass)
}

response := getMetricsConductorResponse{
baseResponse: baseResponse{
baseMessage: baseMessage{
Type: getMetricsMessage,
RequestID: requestID,
},
ErrorMessage: errorMsg,
},
Metrics: metricsData,
}

return c.sendResponse(response, string(getMetricsMessage))
}

func (c *conductor) handleListWorkflowsRequest(data []byte, requestID string) error {
var req listWorkflowsConductorRequest
if err := json.Unmarshal(data, &req); err != nil {
Expand Down
17 changes: 17 additions & 0 deletions dbos/conductor_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
forkWorkflowMessage messageType = "fork_workflow"
existPendingWorkflowsMessage messageType = "exist_pending_workflows"
retentionMessage messageType = "retention"
getMetricsMessage messageType = "get_metrics"
)

// baseMessage represents the common structure of all conductor messages
Expand All @@ -46,6 +47,8 @@ type executorInfoResponse struct {
ExecutorID string `json:"executor_id"`
ApplicationVersion string `json:"application_version"`
Hostname *string `json:"hostname,omitempty"`
DBOSVersion string `json:"dbos_version"`
Language string `json:"language"`
}

// listWorkflowsConductorRequestBody contains filter parameters for listing workflows
Expand Down Expand Up @@ -378,3 +381,17 @@ type retentionConductorResponse struct {
baseResponse
Success bool `json:"success"`
}

// getMetricsConductorRequest is sent by the conductor to request metrics
type getMetricsConductorRequest struct {
baseMessage
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
MetricClass string `json:"metric_class"`
}

// getMetricsConductorResponse is sent in response to metrics requests
type getMetricsConductorResponse struct {
baseResponse
Metrics []metricData `json:"metrics"`
}
130 changes: 130 additions & 0 deletions dbos/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package dbos

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetMetrics(t *testing.T) {
dbosCtx := setupDBOS(t, true, true)
defer Shutdown(dbosCtx, 1*time.Minute)

// Get the internal systemDB instance
sysDB, ok := dbosCtx.(*dbosContext)
require.True(t, ok, "expected dbosContext")
require.NotNil(t, sysDB.systemDB)

// Define test workflows
testWorkflowA := func(ctx DBOSContext, input string) (string, error) {
_, err := RunAsStep(ctx, func(_ context.Context) (string, error) {
return "x", nil
}, WithStepName("testStepX"))
if err != nil {
return "", err
}
_, err = RunAsStep(ctx, func(_ context.Context) (string, error) {
return "x", nil
}, WithStepName("testStepX"))
if err != nil {
return "", err
}
return "a", nil
}

testWorkflowB := func(ctx DBOSContext, input string) (string, error) {
_, err := RunAsStep(ctx, func(_ context.Context) (string, error) {
return "y", nil
}, WithStepName("testStepY"))
if err != nil {
return "", err
}
return "b", nil
}

// Register workflows with custom names
RegisterWorkflow(dbosCtx, testWorkflowA, WithWorkflowName("testWorkflowA"))
RegisterWorkflow(dbosCtx, testWorkflowB, WithWorkflowName("testWorkflowB"))

// Record start time before creating workflows
startTime := time.Now()

// Execute workflows to create metrics data
handle1, err := RunWorkflow(dbosCtx, testWorkflowA, "input1")
require.NoError(t, err)
_, err = handle1.GetResult()
require.NoError(t, err)

handle2, err := RunWorkflow(dbosCtx, testWorkflowA, "input2")
require.NoError(t, err)
_, err = handle2.GetResult()
require.NoError(t, err)

handle3, err := RunWorkflow(dbosCtx, testWorkflowB, "input3")
require.NoError(t, err)
_, err = handle3.GetResult()
require.NoError(t, err)

// Query metrics from start to now + 10 hours
endTime := time.Now().Add(10 * time.Hour)
metrics, err := sysDB.systemDB.getMetrics(context.Background(), startTime.Format(time.RFC3339), endTime.Format(time.RFC3339))
require.NoError(t, err)
assert.GreaterOrEqual(t, len(metrics), 4, "Expected at least 4 metrics (2 workflow counts + 2 step counts)")

// Convert to map for easier assertion
metricsMap := make(map[string]float64)
for _, m := range metrics {
key := m.MetricType + ":" + m.MetricName
metricsMap[key] = m.Value
}

// Verify workflow counts
workflowCountAFound := false
workflowCountBFound := false
stepCountXFound := false
stepCountYFound := false

for _, m := range metrics {
if m.MetricType == "workflow_count" && m.MetricName == "testWorkflowA" {
workflowCountAFound = true
assert.Equal(t, float64(2), m.Value, "testWorkflowA should have 2 executions")
}
if m.MetricType == "workflow_count" && m.MetricName == "testWorkflowB" {
workflowCountBFound = true
assert.Equal(t, float64(1), m.Value, "testWorkflowB should have 1 execution")
}
if m.MetricType == "step_count" && m.MetricName == "testStepX" {
stepCountXFound = true
assert.Equal(t, float64(4), m.Value, "testStepX should have 4 executions (2 workflows * 2 steps)")
}
if m.MetricType == "step_count" && m.MetricName == "testStepY" {
stepCountYFound = true
assert.Equal(t, float64(1), m.Value, "testStepY should have 1 execution")
}
}

assert.True(t, workflowCountAFound, "Should have testWorkflowA workflow_count metric")
assert.True(t, workflowCountBFound, "Should have testWorkflowB workflow_count metric")
assert.True(t, stepCountXFound, "Should have testStepX step_count metric")
assert.True(t, stepCountYFound, "Should have testStepY step_count metric")
}

func TestGetMetricsEmptyTimeRange(t *testing.T) {
dbosCtx := setupDBOS(t, true, true)
defer Shutdown(dbosCtx, 1*time.Minute)

sysDB, ok := dbosCtx.(*dbosContext)
require.True(t, ok, "expected dbosContext")
require.NotNil(t, sysDB.systemDB)

// Query metrics for a time range with no data
futureTime := time.Now().Add(24 * time.Hour)
futureTime2 := futureTime.Add(1 * time.Hour)

metrics, err := sysDB.systemDB.getMetrics(context.Background(), futureTime.Format(time.RFC3339), futureTime2.Format(time.RFC3339))
require.NoError(t, err)
assert.Equal(t, 0, len(metrics), "Should return empty metrics for future time range")
}
115 changes: 115 additions & 0 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type systemDatabase interface {

// Garbage collection
garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error

// Metrics
getMetrics(ctx context.Context, startTime string, endTime string) ([]metricData, error)
}

type sysDB struct {
Expand Down Expand Up @@ -2533,6 +2536,118 @@ func (s *sysDB) getQueuePartitions(ctx context.Context, queueName string) ([]str
return partitions, nil
}

/*******************************/
/******* METRICS ********/
/*******************************/

type metricData struct {
MetricName string `json:"metric_name"` // step name or workflow name
MetricType string `json:"metric_type"` // workflow_count, step_count, etc
Value float64 `json:"value"`
}

func (s *sysDB) getMetrics(ctx context.Context, startTime, endTime string) ([]metricData, error) {
// Parse ISO timestamp strings to time.Time
startTimeParsed, err := time.Parse(time.RFC3339, startTime)
if err != nil {
return nil, fmt.Errorf("invalid start_time format: %w", err)
}
endTimeParsed, err := time.Parse(time.RFC3339, endTime)
if err != nil {
return nil, fmt.Errorf("invalid end_time format: %w", err)
}

// Convert to epoch milliseconds
startEpochMs := startTimeParsed.UnixMilli()
endEpochMs := endTimeParsed.UnixMilli()

var metrics []metricData

// Query workflow metrics
workflowMetrics, err := s.getMetricWorkflowCount(ctx, startEpochMs, endEpochMs)
if err != nil {
return nil, err
}
metrics = append(metrics, workflowMetrics...)

// Query step metrics
stepMetrics, err := s.getMetricStepCount(ctx, startEpochMs, endEpochMs)
if err != nil {
return nil, err
}
metrics = append(metrics, stepMetrics...)

return metrics, nil
}

func (s *sysDB) getMetricWorkflowCount(ctx context.Context, startEpochMs, endEpochMs int64) ([]metricData, error) {
workflowQuery := fmt.Sprintf(`
SELECT name, COUNT(workflow_uuid) as count
FROM %s.workflow_status
WHERE created_at >= $1 AND created_at < $2
GROUP BY name
`, pgx.Identifier{s.schema}.Sanitize())

rows, err := s.pool.Query(ctx, workflowQuery, startEpochMs, endEpochMs)
if err != nil {
return nil, fmt.Errorf("failed to query workflow metrics: %w", err)
}
defer rows.Close()

var metrics []metricData
for rows.Next() {
var workflowName string
var workflowCount int64
if err := rows.Scan(&workflowName, &workflowCount); err != nil {
return nil, fmt.Errorf("failed to scan workflow metric: %w", err)
}
metrics = append(metrics, metricData{
MetricType: "workflow_count",
MetricName: workflowName,
Value: float64(workflowCount),
})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating workflow metrics: %w", err)
}

return metrics, nil
}

func (s *sysDB) getMetricStepCount(ctx context.Context, startEpochMs, endEpochMs int64) ([]metricData, error) {
stepQuery := fmt.Sprintf(`
SELECT function_name, COUNT(*) as count
FROM %s.operation_outputs
WHERE completed_at_epoch_ms >= $1 AND completed_at_epoch_ms < $2
GROUP BY function_name
`, pgx.Identifier{s.schema}.Sanitize())

rows, err := s.pool.Query(ctx, stepQuery, startEpochMs, endEpochMs)
if err != nil {
return nil, fmt.Errorf("failed to query step metrics: %w", err)
}
defer rows.Close()

var metrics []metricData
for rows.Next() {
var stepName string
var stepCount int64
if err := rows.Scan(&stepName, &stepCount); err != nil {
return nil, fmt.Errorf("failed to scan step metric: %w", err)
}
metrics = append(metrics, metricData{
MetricType: "step_count",
MetricName: stepName,
Value: float64(stepCount),
})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating step metrics: %w", err)
}

return metrics, nil
}

/*******************************/
/******* UTILS ********/
/*******************************/
Expand Down