|
1 | 1 | import numpy as np |
2 | 2 | import pandas as pd |
3 | 3 | import pymc as pm |
| 4 | +import pytest |
4 | 5 |
|
5 | 6 | from numpy.testing import assert_allclose |
6 | 7 | from pytensor import config |
|
15 | 16 | RTOL = 0 if config.floatX.endswith("64") else 1e-6 |
16 | 17 |
|
17 | 18 |
|
18 | | -def test_exogenous_component(rng): |
19 | | - data = rng.normal(size=(100, 2)).astype(config.floatX) |
20 | | - mod = st.RegressionComponent(state_names=["feature_1", "feature_2"], name="exog") |
| 19 | +@pytest.fixture |
| 20 | +def regression_data(rng): |
| 21 | + """Generate test data for regression components (2 exogenous variables).""" |
| 22 | + return rng.normal(size=(100, 2)).astype(config.floatX) |
21 | 23 |
|
22 | | - params = {"beta_exog": np.array([1.0, 2.0], dtype=config.floatX)} |
23 | | - exog_data = {"data_exog": data} |
24 | | - x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
25 | 24 |
|
26 | | - # Check that the generated data is just a linear regression |
27 | | - assert_allclose(y, data @ params["beta_exog"], atol=ATOL, rtol=RTOL) |
| 25 | +@pytest.fixture |
| 26 | +def multiple_regression_data(rng): |
| 27 | + """Generate test data for multiple regression components.""" |
| 28 | + return { |
| 29 | + "data_1": rng.normal(size=(100, 2)).astype(config.floatX), |
| 30 | + "data_2": rng.normal(size=(100, 1)).astype(config.floatX), |
| 31 | + } |
28 | 32 |
|
29 | | - mod = mod.build(verbose=False) |
30 | | - _assert_basic_coords_correct(mod) |
31 | | - assert mod.coords["state_exog"] == ["feature_1", "feature_2"] |
32 | 33 |
|
33 | | - |
34 | | -def test_adding_exogenous_component(rng): |
35 | | - data = rng.normal(size=(100, 2)).astype(config.floatX) |
36 | | - reg = st.RegressionComponent(state_names=["a", "b"], name="exog") |
37 | | - ll = st.LevelTrendComponent(name="level") |
38 | | - |
39 | | - seasonal = st.FrequencySeasonality(name="annual", season_length=12, n=4) |
40 | | - mod = reg + ll + seasonal |
41 | | - |
42 | | - assert mod.ssm["design"].eval({"data_exog": data}).shape == (100, 1, 2 + 2 + 8) |
43 | | - assert_allclose(mod.ssm["design", 5, 0, :2].eval({"data_exog": data}), data[5]) |
44 | | - |
45 | | - |
46 | | -def test_regression_with_multiple_observed_states(rng): |
47 | | - from scipy.linalg import block_diag |
48 | | - |
49 | | - data = rng.normal(size=(100, 2)).astype(config.floatX) |
50 | | - mod = st.RegressionComponent( |
51 | | - state_names=["feature_1", "feature_2"], |
52 | | - name="exog", |
53 | | - observed_state_names=["data_1", "data_2"], |
54 | | - ) |
55 | | - |
56 | | - params = {"beta_exog": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX)} |
57 | | - exog_data = {"data_exog": data} |
58 | | - x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
59 | | - |
60 | | - assert x.shape == (100, 4) # 2 features, 2 states |
61 | | - assert y.shape == (100, 2) |
62 | | - |
63 | | - # Check that the generated data are two independent linear regressions |
64 | | - assert_allclose(y[:, 0], data @ params["beta_exog"][0], atol=ATOL, rtol=RTOL) |
65 | | - assert_allclose(y[:, 1], data @ params["beta_exog"][1], atol=ATOL, rtol=RTOL) |
66 | | - |
67 | | - mod = mod.build(verbose=False) |
68 | | - assert mod.coords["state_exog"] == [ |
69 | | - "feature_1", |
70 | | - "feature_2", |
71 | | - ] |
72 | | - |
73 | | - Z = mod.ssm["design"].eval({"data_exog": data}) |
74 | | - vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
75 | | - assert Z.shape == (100, 2, 4) |
76 | | - assert np.allclose(Z, vec_block_diag(data[:, None, :], data[:, None, :])) |
77 | | - |
78 | | - |
79 | | -def test_add_regression_components_with_multiple_observed_states(rng): |
80 | | - from scipy.linalg import block_diag |
81 | | - |
82 | | - data_1 = rng.normal(size=(100, 2)).astype(config.floatX) |
83 | | - data_2 = rng.normal(size=(100, 1)).astype(config.floatX) |
84 | | - |
85 | | - reg1 = st.RegressionComponent( |
86 | | - state_names=["a", "b"], name="exog1", observed_state_names=["data_1", "data_2"] |
87 | | - ) |
88 | | - reg2 = st.RegressionComponent(state_names=["c"], name="exog2", observed_state_names=["data_3"]) |
89 | | - |
90 | | - mod = (reg1 + reg2).build(verbose=False) |
91 | | - assert mod.coords["state_exog1"] == ["a", "b"] |
92 | | - assert mod.coords["state_exog2"] == ["c"] |
93 | | - |
94 | | - Z = mod.ssm["design"].eval({"data_exog1": data_1, "data_exog2": data_2}) |
95 | | - vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
96 | | - assert Z.shape == (100, 3, 5) |
97 | | - assert np.allclose( |
98 | | - Z, |
99 | | - vec_block_diag(vec_block_diag(data_1[:, None, :], data_1[:, None, :]), data_2[:, None, :]), |
100 | | - ) |
101 | | - |
102 | | - x0 = mod.ssm["initial_state"].eval( |
103 | | - { |
104 | | - "beta_exog1": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX), |
105 | | - "beta_exog2": np.array([5.0], dtype=config.floatX), |
106 | | - } |
107 | | - ) |
108 | | - np.testing.assert_allclose(x0, np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=config.floatX)) |
109 | | - |
110 | | - |
111 | | -def test_filter_scans_time_varying_design_matrix(rng): |
| 34 | +@pytest.fixture |
| 35 | +def time_series_data(rng): |
| 36 | + """Generate time series data for PyMC integration tests.""" |
112 | 37 | time_idx = pd.date_range(start="2000-01-01", freq="D", periods=100) |
113 | 38 | data = pd.DataFrame(rng.normal(size=(100, 2)), columns=["a", "b"], index=time_idx) |
114 | | - |
115 | 39 | y = pd.DataFrame(rng.normal(size=(100, 1)), columns=["data"], index=time_idx) |
116 | | - |
117 | | - reg = st.RegressionComponent(state_names=["a", "b"], name="exog") |
118 | | - mod = reg.build(verbose=False) |
119 | | - |
120 | | - with pm.Model(coords=mod.coords) as m: |
121 | | - data_exog = pm.Data("data_exog", data.values) |
122 | | - |
123 | | - x0 = pm.Normal("x0", dims=["state"]) |
124 | | - P0 = pm.Deterministic("P0", pt.eye(mod.k_states), dims=["state", "state_aux"]) |
125 | | - beta_exog = pm.Normal("beta_exog", dims=["state_exog"]) |
126 | | - |
127 | | - mod.build_statespace_graph(y) |
128 | | - x0, P0, c, d, T, Z, R, H, Q = mod.unpack_statespace() |
129 | | - pm.Deterministic("Z", Z) |
130 | | - |
131 | | - prior = pm.sample_prior_predictive(draws=10) |
132 | | - |
133 | | - prior_Z = prior.prior.Z.values |
134 | | - assert prior_Z.shape == (1, 10, 100, 1, 2) |
135 | | - assert_allclose(prior_Z[0, :, :, 0, :], data.values[None].repeat(10, axis=0)) |
| 40 | + return data, y |
| 41 | + |
| 42 | + |
| 43 | +class TestRegressionComponent: |
| 44 | + """Test basic regression component functionality.""" |
| 45 | + |
| 46 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 47 | + def test_exogenous_component(self, rng, regression_data, innovations): |
| 48 | + """Test basic regression component with and without innovations.""" |
| 49 | + mod = st.RegressionComponent( |
| 50 | + state_names=["feature_1", "feature_2"], name="exog", innovations=innovations |
| 51 | + ) |
| 52 | + |
| 53 | + params = {"beta_exog": np.array([1.0, 2.0], dtype=config.floatX)} |
| 54 | + if innovations: |
| 55 | + params["sigma_beta_exog"] = np.array([0.1, 0.2], dtype=config.floatX) |
| 56 | + |
| 57 | + exog_data = {"data_exog": regression_data} |
| 58 | + x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
| 59 | + |
| 60 | + if not innovations: |
| 61 | + # Check that the generated data is just a linear regression |
| 62 | + assert_allclose(y, regression_data @ params["beta_exog"], atol=ATOL, rtol=RTOL) |
| 63 | + else: |
| 64 | + # With innovations, the coefficients should vary over time |
| 65 | + # The initial state should match the beta parameters |
| 66 | + assert_allclose(x[0], params["beta_exog"], atol=ATOL, rtol=RTOL) |
| 67 | + |
| 68 | + mod = mod.build(verbose=False) |
| 69 | + _assert_basic_coords_correct(mod) |
| 70 | + assert mod.coords["state_exog"] == ["feature_1", "feature_2"] |
| 71 | + |
| 72 | + if innovations: |
| 73 | + # Check that sigma_beta parameter is included |
| 74 | + assert "sigma_beta_exog" in mod.param_names |
| 75 | + |
| 76 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 77 | + def test_adding_exogenous_component(self, rng, regression_data, innovations): |
| 78 | + """Test adding regression component to other components.""" |
| 79 | + reg = st.RegressionComponent(state_names=["a", "b"], name="exog", innovations=innovations) |
| 80 | + ll = st.LevelTrendComponent(name="level") |
| 81 | + seasonal = st.FrequencySeasonality(name="annual", season_length=12, n=4) |
| 82 | + mod = reg + ll + seasonal |
| 83 | + |
| 84 | + assert mod.ssm["design"].eval({"data_exog": regression_data}).shape == (100, 1, 2 + 2 + 8) |
| 85 | + assert_allclose( |
| 86 | + mod.ssm["design", 5, 0, :2].eval({"data_exog": regression_data}), regression_data[5] |
| 87 | + ) |
| 88 | + |
| 89 | + if innovations: |
| 90 | + # Check that sigma_beta parameter is included in the combined model |
| 91 | + assert "sigma_beta_exog" in mod.param_names |
| 92 | + |
| 93 | + |
| 94 | +class TestMultivariateRegression: |
| 95 | + """Test multivariate regression functionality.""" |
| 96 | + |
| 97 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 98 | + def test_regression_with_multiple_observed_states(self, rng, regression_data, innovations): |
| 99 | + """Test multivariate regression with and without innovations.""" |
| 100 | + from scipy.linalg import block_diag |
| 101 | + |
| 102 | + mod = st.RegressionComponent( |
| 103 | + state_names=["feature_1", "feature_2"], |
| 104 | + name="exog", |
| 105 | + observed_state_names=["data_1", "data_2"], |
| 106 | + innovations=innovations, |
| 107 | + ) |
| 108 | + |
| 109 | + params = {"beta_exog": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX)} |
| 110 | + if innovations: |
| 111 | + params["sigma_beta_exog"] = np.array([[0.1, 0.2], [0.3, 0.4]], dtype=config.floatX) |
| 112 | + |
| 113 | + exog_data = {"data_exog": regression_data} |
| 114 | + x, y = simulate_from_numpy_model(mod, rng, params, exog_data) |
| 115 | + |
| 116 | + assert x.shape == (100, 4) # 2 features, 2 states |
| 117 | + assert y.shape == (100, 2) |
| 118 | + |
| 119 | + if not innovations: |
| 120 | + # Check that the generated data are two independent linear regressions |
| 121 | + assert_allclose(y[:, 0], regression_data @ params["beta_exog"][0], atol=ATOL, rtol=RTOL) |
| 122 | + assert_allclose(y[:, 1], regression_data @ params["beta_exog"][1], atol=ATOL, rtol=RTOL) |
| 123 | + else: |
| 124 | + # Check that initial states match the beta parameters |
| 125 | + assert_allclose(x[0, :2], params["beta_exog"][0], atol=ATOL, rtol=RTOL) |
| 126 | + assert_allclose(x[0, 2:], params["beta_exog"][1], atol=ATOL, rtol=RTOL) |
| 127 | + |
| 128 | + mod = mod.build(verbose=False) |
| 129 | + assert mod.coords["state_exog"] == ["feature_1", "feature_2"] |
| 130 | + |
| 131 | + Z = mod.ssm["design"].eval({"data_exog": regression_data}) |
| 132 | + vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
| 133 | + assert Z.shape == (100, 2, 4) |
| 134 | + assert np.allclose( |
| 135 | + Z, |
| 136 | + vec_block_diag(regression_data[:, None, :], regression_data[:, None, :]), |
| 137 | + ) |
| 138 | + |
| 139 | + if innovations: |
| 140 | + # Check that sigma_beta parameter is included |
| 141 | + assert "sigma_beta_exog" in mod.param_names |
| 142 | + |
| 143 | + |
| 144 | +class TestMultipleRegressionComponents: |
| 145 | + """Test multiple regression components functionality.""" |
| 146 | + |
| 147 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 148 | + def test_add_regression_components_with_multiple_observed_states( |
| 149 | + self, rng, multiple_regression_data, innovations |
| 150 | + ): |
| 151 | + """Test adding multiple regression components with and without innovations.""" |
| 152 | + from scipy.linalg import block_diag |
| 153 | + |
| 154 | + reg1 = st.RegressionComponent( |
| 155 | + state_names=["a", "b"], |
| 156 | + name="exog1", |
| 157 | + observed_state_names=["data_1", "data_2"], |
| 158 | + innovations=innovations, |
| 159 | + ) |
| 160 | + reg2 = st.RegressionComponent( |
| 161 | + state_names=["c"], |
| 162 | + name="exog2", |
| 163 | + observed_state_names=["data_3"], |
| 164 | + innovations=innovations, |
| 165 | + ) |
| 166 | + |
| 167 | + mod = (reg1 + reg2).build(verbose=False) |
| 168 | + assert mod.coords["state_exog1"] == ["a", "b"] |
| 169 | + assert mod.coords["state_exog2"] == ["c"] |
| 170 | + |
| 171 | + Z = mod.ssm["design"].eval( |
| 172 | + { |
| 173 | + "data_exog1": multiple_regression_data["data_1"], |
| 174 | + "data_exog2": multiple_regression_data["data_2"], |
| 175 | + } |
| 176 | + ) |
| 177 | + vec_block_diag = np.vectorize(block_diag, signature="(n,m),(o,p)->(q,r)") |
| 178 | + assert Z.shape == (100, 3, 5) |
| 179 | + assert np.allclose( |
| 180 | + Z, |
| 181 | + vec_block_diag( |
| 182 | + vec_block_diag( |
| 183 | + multiple_regression_data["data_1"][:, None, :], |
| 184 | + multiple_regression_data["data_1"][:, None, :], |
| 185 | + ), |
| 186 | + multiple_regression_data["data_2"][:, None, :], |
| 187 | + ), |
| 188 | + ) |
| 189 | + |
| 190 | + x0 = mod.ssm["initial_state"].eval( |
| 191 | + { |
| 192 | + "beta_exog1": np.array([[1.0, 2.0], [3.0, 4.0]], dtype=config.floatX), |
| 193 | + "beta_exog2": np.array([5.0], dtype=config.floatX), |
| 194 | + } |
| 195 | + ) |
| 196 | + np.testing.assert_allclose(x0, np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=config.floatX)) |
| 197 | + |
| 198 | + if innovations: |
| 199 | + # Check that sigma_beta parameters are included |
| 200 | + assert "sigma_beta_exog1" in mod.param_names |
| 201 | + assert "sigma_beta_exog2" in mod.param_names |
| 202 | + |
| 203 | + |
| 204 | +class TestPyMCIntegration: |
| 205 | + """Test PyMC integration functionality.""" |
| 206 | + |
| 207 | + @pytest.mark.parametrize("innovations", [False, True]) |
| 208 | + def test_filter_scans_time_varying_design_matrix(self, rng, time_series_data, innovations): |
| 209 | + """Test PyMC integration with and without innovations.""" |
| 210 | + data, y = time_series_data |
| 211 | + |
| 212 | + reg = st.RegressionComponent(state_names=["a", "b"], name="exog", innovations=innovations) |
| 213 | + mod = reg.build(verbose=False) |
| 214 | + |
| 215 | + with pm.Model(coords=mod.coords) as m: |
| 216 | + data_exog = pm.Data("data_exog", data.values) |
| 217 | + |
| 218 | + x0 = pm.Normal("x0", dims=["state"]) |
| 219 | + P0 = pm.Deterministic("P0", pt.eye(mod.k_states), dims=["state", "state_aux"]) |
| 220 | + beta_exog = pm.Normal("beta_exog", dims=["state_exog"]) |
| 221 | + |
| 222 | + if innovations: |
| 223 | + sigma_beta_exog = pm.Exponential("sigma_beta_exog", 1, dims=["state_exog"]) |
| 224 | + |
| 225 | + mod.build_statespace_graph(y) |
| 226 | + x0, P0, c, d, T, Z, R, H, Q = mod.unpack_statespace() |
| 227 | + pm.Deterministic("Z", Z) |
| 228 | + |
| 229 | + prior = pm.sample_prior_predictive(draws=10) |
| 230 | + |
| 231 | + prior_Z = prior.prior.Z.values |
| 232 | + assert prior_Z.shape == (1, 10, 100, 1, 2) |
| 233 | + assert_allclose(prior_Z[0, :, :, 0, :], data.values[None].repeat(10, axis=0)) |
| 234 | + |
| 235 | + if innovations: |
| 236 | + # Check that sigma_beta parameter is included in the prior |
| 237 | + assert "sigma_beta_exog" in prior.prior.data_vars |
0 commit comments