diff --git a/dbos/conductor.go b/dbos/conductor.go index c12166e..7b7bf19 100644 --- a/dbos/conductor.go +++ b/dbos/conductor.go @@ -344,6 +344,8 @@ func (c *conductor) handleMessage(data []byte) error { return c.handleExistPendingWorkflowsRequest(data, base.RequestID) case retentionMessage: return c.handleRetentionRequest(data, base.RequestID) + case getMetricsMessage: + return c.handleGetMetricsRequest(data, base.RequestID) default: c.logger.Warn("Unknown message type", "type", base.Type) return c.handleUnknownMessageType(base.RequestID, base.Type, "Unknown message type") @@ -374,6 +376,8 @@ func (c *conductor) handleExecutorInfoRequest(data []byte, requestID string) err ExecutorID: c.dbosCtx.GetExecutorID(), ApplicationVersion: c.dbosCtx.GetApplicationVersion(), Hostname: &hostname, + DBOSVersion: getDBOSVersion(), + Language: "go", } return c.sendResponse(response, string(executorInfo)) @@ -551,6 +555,49 @@ func (c *conductor) handleRetentionRequest(data []byte, requestID string) error return c.sendResponse(response, string(retentionMessage)) } +func (c *conductor) handleGetMetricsRequest(data []byte, requestID string) error { + var req getMetricsConductorRequest + if err := json.Unmarshal(data, &req); err != nil { + c.logger.Error("Failed to parse get metrics request", "error", err) + return fmt.Errorf("failed to parse get metrics request: %w", err) + } + c.logger.Debug("Handling get metrics request", + "start_time", req.StartTime, + "end_time", req.EndTime, + "metric_class", req.MetricClass, + "request_id", requestID) + + var errorMsg *string + var metricsData []metricData + + if req.MetricClass == "workflow_step_count" { + var err error + metricsData, err = c.dbosCtx.systemDB.getMetrics(c.dbosCtx, req.StartTime, req.EndTime) + if err != nil { + c.logger.Error("Failed to get metrics", "error", err) + errStr := fmt.Sprintf("Exception encountered when getting metrics: %v", err) + errorMsg = &errStr + } + } else { + errStr := fmt.Sprintf("Unexpected metric class: %s", req.MetricClass) + errorMsg = &errStr + c.logger.Warn("Unexpected metric class", "metric_class", req.MetricClass) + } + + response := getMetricsConductorResponse{ + baseResponse: baseResponse{ + baseMessage: baseMessage{ + Type: getMetricsMessage, + RequestID: requestID, + }, + ErrorMessage: errorMsg, + }, + Metrics: metricsData, + } + + return c.sendResponse(response, string(getMetricsMessage)) +} + func (c *conductor) handleListWorkflowsRequest(data []byte, requestID string) error { var req listWorkflowsConductorRequest if err := json.Unmarshal(data, &req); err != nil { diff --git a/dbos/conductor_protocol.go b/dbos/conductor_protocol.go index ecde747..1fe58a7 100644 --- a/dbos/conductor_protocol.go +++ b/dbos/conductor_protocol.go @@ -21,6 +21,7 @@ const ( forkWorkflowMessage messageType = "fork_workflow" existPendingWorkflowsMessage messageType = "exist_pending_workflows" retentionMessage messageType = "retention" + getMetricsMessage messageType = "get_metrics" ) // baseMessage represents the common structure of all conductor messages @@ -46,6 +47,8 @@ type executorInfoResponse struct { ExecutorID string `json:"executor_id"` ApplicationVersion string `json:"application_version"` Hostname *string `json:"hostname,omitempty"` + DBOSVersion string `json:"dbos_version"` + Language string `json:"language"` } // listWorkflowsConductorRequestBody contains filter parameters for listing workflows @@ -378,3 +381,17 @@ type retentionConductorResponse struct { baseResponse Success bool `json:"success"` } + +// getMetricsConductorRequest is sent by the conductor to request metrics +type getMetricsConductorRequest struct { + baseMessage + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` + MetricClass string `json:"metric_class"` +} + +// getMetricsConductorResponse is sent in response to metrics requests +type getMetricsConductorResponse struct { + baseResponse + Metrics []metricData `json:"metrics"` +} diff --git a/dbos/metrics_test.go b/dbos/metrics_test.go new file mode 100644 index 0000000..e18e429 --- /dev/null +++ b/dbos/metrics_test.go @@ -0,0 +1,130 @@ +package dbos + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetMetrics(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + defer Shutdown(dbosCtx, 1*time.Minute) + + // Get the internal systemDB instance + sysDB, ok := dbosCtx.(*dbosContext) + require.True(t, ok, "expected dbosContext") + require.NotNil(t, sysDB.systemDB) + + // Define test workflows + testWorkflowA := func(ctx DBOSContext, input string) (string, error) { + _, err := RunAsStep(ctx, func(_ context.Context) (string, error) { + return "x", nil + }, WithStepName("testStepX")) + if err != nil { + return "", err + } + _, err = RunAsStep(ctx, func(_ context.Context) (string, error) { + return "x", nil + }, WithStepName("testStepX")) + if err != nil { + return "", err + } + return "a", nil + } + + testWorkflowB := func(ctx DBOSContext, input string) (string, error) { + _, err := RunAsStep(ctx, func(_ context.Context) (string, error) { + return "y", nil + }, WithStepName("testStepY")) + if err != nil { + return "", err + } + return "b", nil + } + + // Register workflows with custom names + RegisterWorkflow(dbosCtx, testWorkflowA, WithWorkflowName("testWorkflowA")) + RegisterWorkflow(dbosCtx, testWorkflowB, WithWorkflowName("testWorkflowB")) + + // Record start time before creating workflows + startTime := time.Now() + + // Execute workflows to create metrics data + handle1, err := RunWorkflow(dbosCtx, testWorkflowA, "input1") + require.NoError(t, err) + _, err = handle1.GetResult() + require.NoError(t, err) + + handle2, err := RunWorkflow(dbosCtx, testWorkflowA, "input2") + require.NoError(t, err) + _, err = handle2.GetResult() + require.NoError(t, err) + + handle3, err := RunWorkflow(dbosCtx, testWorkflowB, "input3") + require.NoError(t, err) + _, err = handle3.GetResult() + require.NoError(t, err) + + // Query metrics from start to now + 10 hours + endTime := time.Now().Add(10 * time.Hour) + metrics, err := sysDB.systemDB.getMetrics(context.Background(), startTime.Format(time.RFC3339), endTime.Format(time.RFC3339)) + require.NoError(t, err) + assert.GreaterOrEqual(t, len(metrics), 4, "Expected at least 4 metrics (2 workflow counts + 2 step counts)") + + // Convert to map for easier assertion + metricsMap := make(map[string]float64) + for _, m := range metrics { + key := m.MetricType + ":" + m.MetricName + metricsMap[key] = m.Value + } + + // Verify workflow counts + workflowCountAFound := false + workflowCountBFound := false + stepCountXFound := false + stepCountYFound := false + + for _, m := range metrics { + if m.MetricType == "workflow_count" && m.MetricName == "testWorkflowA" { + workflowCountAFound = true + assert.Equal(t, float64(2), m.Value, "testWorkflowA should have 2 executions") + } + if m.MetricType == "workflow_count" && m.MetricName == "testWorkflowB" { + workflowCountBFound = true + assert.Equal(t, float64(1), m.Value, "testWorkflowB should have 1 execution") + } + if m.MetricType == "step_count" && m.MetricName == "testStepX" { + stepCountXFound = true + assert.Equal(t, float64(4), m.Value, "testStepX should have 4 executions (2 workflows * 2 steps)") + } + if m.MetricType == "step_count" && m.MetricName == "testStepY" { + stepCountYFound = true + assert.Equal(t, float64(1), m.Value, "testStepY should have 1 execution") + } + } + + assert.True(t, workflowCountAFound, "Should have testWorkflowA workflow_count metric") + assert.True(t, workflowCountBFound, "Should have testWorkflowB workflow_count metric") + assert.True(t, stepCountXFound, "Should have testStepX step_count metric") + assert.True(t, stepCountYFound, "Should have testStepY step_count metric") +} + +func TestGetMetricsEmptyTimeRange(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + defer Shutdown(dbosCtx, 1*time.Minute) + + sysDB, ok := dbosCtx.(*dbosContext) + require.True(t, ok, "expected dbosContext") + require.NotNil(t, sysDB.systemDB) + + // Query metrics for a time range with no data + futureTime := time.Now().Add(24 * time.Hour) + futureTime2 := futureTime.Add(1 * time.Hour) + + metrics, err := sysDB.systemDB.getMetrics(context.Background(), futureTime.Format(time.RFC3339), futureTime2.Format(time.RFC3339)) + require.NoError(t, err) + assert.Equal(t, 0, len(metrics), "Should return empty metrics for future time range") +} diff --git a/dbos/system_database.go b/dbos/system_database.go index 6424ea2..16ed96b 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -70,6 +70,9 @@ type systemDatabase interface { // Garbage collection garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error + + // Metrics + getMetrics(ctx context.Context, startTime string, endTime string) ([]metricData, error) } type sysDB struct { @@ -2533,6 +2536,118 @@ func (s *sysDB) getQueuePartitions(ctx context.Context, queueName string) ([]str return partitions, nil } +/*******************************/ +/******* METRICS ********/ +/*******************************/ + +type metricData struct { + MetricName string `json:"metric_name"` // step name or workflow name + MetricType string `json:"metric_type"` // workflow_count, step_count, etc + Value float64 `json:"value"` +} + +func (s *sysDB) getMetrics(ctx context.Context, startTime, endTime string) ([]metricData, error) { + // Parse ISO timestamp strings to time.Time + startTimeParsed, err := time.Parse(time.RFC3339, startTime) + if err != nil { + return nil, fmt.Errorf("invalid start_time format: %w", err) + } + endTimeParsed, err := time.Parse(time.RFC3339, endTime) + if err != nil { + return nil, fmt.Errorf("invalid end_time format: %w", err) + } + + // Convert to epoch milliseconds + startEpochMs := startTimeParsed.UnixMilli() + endEpochMs := endTimeParsed.UnixMilli() + + var metrics []metricData + + // Query workflow metrics + workflowMetrics, err := s.getMetricWorkflowCount(ctx, startEpochMs, endEpochMs) + if err != nil { + return nil, err + } + metrics = append(metrics, workflowMetrics...) + + // Query step metrics + stepMetrics, err := s.getMetricStepCount(ctx, startEpochMs, endEpochMs) + if err != nil { + return nil, err + } + metrics = append(metrics, stepMetrics...) + + return metrics, nil +} + +func (s *sysDB) getMetricWorkflowCount(ctx context.Context, startEpochMs, endEpochMs int64) ([]metricData, error) { + workflowQuery := fmt.Sprintf(` + SELECT name, COUNT(workflow_uuid) as count + FROM %s.workflow_status + WHERE created_at >= $1 AND created_at < $2 + GROUP BY name + `, pgx.Identifier{s.schema}.Sanitize()) + + rows, err := s.pool.Query(ctx, workflowQuery, startEpochMs, endEpochMs) + if err != nil { + return nil, fmt.Errorf("failed to query workflow metrics: %w", err) + } + defer rows.Close() + + var metrics []metricData + for rows.Next() { + var workflowName string + var workflowCount int64 + if err := rows.Scan(&workflowName, &workflowCount); err != nil { + return nil, fmt.Errorf("failed to scan workflow metric: %w", err) + } + metrics = append(metrics, metricData{ + MetricType: "workflow_count", + MetricName: workflowName, + Value: float64(workflowCount), + }) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating workflow metrics: %w", err) + } + + return metrics, nil +} + +func (s *sysDB) getMetricStepCount(ctx context.Context, startEpochMs, endEpochMs int64) ([]metricData, error) { + stepQuery := fmt.Sprintf(` + SELECT function_name, COUNT(*) as count + FROM %s.operation_outputs + WHERE completed_at_epoch_ms >= $1 AND completed_at_epoch_ms < $2 + GROUP BY function_name + `, pgx.Identifier{s.schema}.Sanitize()) + + rows, err := s.pool.Query(ctx, stepQuery, startEpochMs, endEpochMs) + if err != nil { + return nil, fmt.Errorf("failed to query step metrics: %w", err) + } + defer rows.Close() + + var metrics []metricData + for rows.Next() { + var stepName string + var stepCount int64 + if err := rows.Scan(&stepName, &stepCount); err != nil { + return nil, fmt.Errorf("failed to scan step metric: %w", err) + } + metrics = append(metrics, metricData{ + MetricType: "step_count", + MetricName: stepName, + Value: float64(stepCount), + }) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating step metrics: %w", err) + } + + return metrics, nil +} + /*******************************/ /******* UTILS ********/ /*******************************/