Skip to content

Commit c6a09d3

Browse files
authored
ESQL: Fix INLINE STATS + FORK + pushdown bug (#138633)
Fixes a bug that comes up when combining field loading pushown, INLINE STATS, and FORK. Speeds up pushdown rule by going in one pass.
1 parent 15b0340 commit c6a09d3

File tree

2 files changed

+81
-35
lines changed

2 files changed

+81
-35
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,6 @@ tests:
388388
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackLookupJoinIT
389389
method: testLookupExplosionBigString
390390
issue: https://github.com/elastic/elasticsearch/issues/138510
391-
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
392-
method: test {csv-spec:inlinestats.MvMinMvExpand}
393-
issue: https://github.com/elastic/elasticsearch/issues/137679
394391
- class: org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT
395392
method: testWaitForSnapshot
396393
issue: https://github.com/elastic/elasticsearch/issues/138669

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java

Lines changed: 81 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2424
import org.elasticsearch.xpack.esql.plan.logical.Filter;
2525
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
26+
import org.elasticsearch.xpack.esql.plan.logical.Project;
2627
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
2728
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
2829

@@ -34,9 +35,40 @@
3435
import static org.elasticsearch.xpack.esql.core.expression.Attribute.rawTemporaryName;
3536

3637
/**
37-
* Replaces vector similarity functions with a field attribute that applies
38-
* the similarity function during value loading, when one side of the function is a literal.
39-
* It also adds the new field function attribute to the EsRelation output, and adds a projection after it to remove it from the output.
38+
* Replaces {@link Expression}s that can be pushed to field loading with a field attribute
39+
* that calculates the expression during value loading. See {@link BlockLoaderExpression}
40+
* for more about how these loads are implemented and why we do this.
41+
* <p>
42+
* This rule runs in one downward (aka output-to-read) pass, making four sorts
43+
* of transformations:
44+
* </p>
45+
* <ul>
46+
* <li>
47+
* When we see a use of a <strong>new</strong> pushable function we build an
48+
* attribute for the function, record that attribute, and discard it after use.
49+
* For example, {@code EVAL l = LENGTH(message)} becomes
50+
* {@code EVAL l = $$message$LENGTH$1324$$ | DROP $$message$LENGTH$1324$$ }.
51+
* We need the {@code DROP} so we don't change the output schema.
52+
* </li>
53+
* <li>
54+
* When we see a use of pushable function for which we already have an attribute
55+
* we just use it. This looks like the {@code l} attribute in
56+
* {@code EVAL l = LENGTH(message) | EVAL l2 = LENGTH(message)}
57+
* </li>
58+
* <li>
59+
* When we see a PROJECT, add any new attributes to the projection so we can use
60+
* them on previously visited nodes. So {@code KEEP foo | EVAL l = LENGTH(message)}
61+
* becomes
62+
* <pre>{@code
63+
* | KEEP foo, $$message$LENGTH$1324$$
64+
* | EVAL l = $$message$LENGTH$1324$$
65+
* | DROP $$message$LENGTH$1324$$}
66+
* }</pre>
67+
* </li>
68+
* <li>
69+
* When we see a relation, add the attribute to it.
70+
* </li>
71+
* </ul>
4072
*/
4173
public class PushExpressionsToFieldLoad extends ParameterizedRule<LogicalPlan, LogicalPlan, LocalLogicalOptimizerContext> {
4274

@@ -56,45 +88,46 @@ private class Rule {
5688
* The primary indices, lazily initialized.
5789
*/
5890
private List<EsRelation> primaries;
59-
private boolean planWasTransformed = false;
91+
private boolean addedNewAttribute = false;
6092

6193
private Rule(LocalLogicalOptimizerContext context, LogicalPlan plan) {
6294
this.context = context;
6395
this.plan = plan;
6496
}
6597

6698
private LogicalPlan doRule(LogicalPlan plan) {
67-
planWasTransformed = false;
99+
addedNewAttribute = false;
68100
if (plan instanceof Eval || plan instanceof Filter || plan instanceof Aggregate) {
69-
LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> {
70-
if (e instanceof BlockLoaderExpression ble) {
71-
return transformExpression(e, ble);
72-
}
73-
return e;
74-
});
101+
return transformPotentialInvocation(plan);
102+
}
103+
if (addedAttrs.isEmpty()) {
104+
return plan;
105+
}
106+
if (plan instanceof Project project) {
107+
return transformProject(project);
108+
}
109+
if (plan instanceof EsRelation rel) {
110+
return transformRelation(rel);
111+
}
112+
return plan;
113+
}
75114

76-
// TODO rebuild everything one time rather than after each find.
77-
if (planWasTransformed == false) {
78-
return plan;
115+
private LogicalPlan transformPotentialInvocation(LogicalPlan plan) {
116+
LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> {
117+
if (e instanceof BlockLoaderExpression ble) {
118+
return transformExpression(e, ble);
79119
}
80-
81-
List<Attribute> previousAttrs = transformedPlan.output();
82-
// Transforms EsRelation to extract the new attributes
83-
List<Attribute> addedAttrsList = addedAttrs.values().stream().toList();
84-
transformedPlan = transformedPlan.transformDown(EsRelation.class, esRelation -> {
85-
AttributeSet updatedOutput = esRelation.outputSet().combine(AttributeSet.of(addedAttrsList));
86-
return esRelation.withAttributes(updatedOutput.stream().toList());
87-
});
88-
// Transforms Projects so the new attribute is not discarded
89-
transformedPlan = transformedPlan.transformDown(EsqlProject.class, esProject -> {
90-
List<NamedExpression> projections = new ArrayList<>(esProject.projections());
91-
projections.addAll(addedAttrsList);
92-
return esProject.withProjections(projections);
93-
});
94-
95-
return new EsqlProject(Source.EMPTY, transformedPlan, previousAttrs);
120+
return e;
121+
});
122+
if (addedNewAttribute == false) {
123+
/*
124+
* Either didn't see anything pushable or everything pushable already
125+
* has a pushed attribute.
126+
*/
127+
return plan;
96128
}
97-
return plan;
129+
// Found a new pushable attribute, discard it *after* use so we don't modify the output.
130+
return new EsqlProject(Source.EMPTY, transformedPlan, transformedPlan.output());
98131
}
99132

100133
private Expression transformExpression(Expression e, BlockLoaderExpression ble) {
@@ -109,10 +142,26 @@ private Expression transformExpression(Expression e, BlockLoaderExpression ble)
109142
if (context.searchStats().supportsLoaderConfig(fuse.field().fieldName(), fuse.config(), preference) == false) {
110143
return e;
111144
}
112-
planWasTransformed = true;
145+
addedNewAttribute = true;
113146
return replaceFieldsForFieldTransformations(e, fuse);
114147
}
115148

149+
private LogicalPlan transformProject(Project project) {
150+
// Preserve any pushed attributes so we can use them later
151+
List<NamedExpression> projections = new ArrayList<>(project.projections());
152+
projections.addAll(addedAttrs.values());
153+
return project.withProjections(projections);
154+
}
155+
156+
private LogicalPlan transformRelation(EsRelation rel) {
157+
// Add the pushed attribute
158+
if (rel.indexMode() == IndexMode.LOOKUP) {
159+
return rel;
160+
}
161+
AttributeSet updatedOutput = rel.outputSet().combine(AttributeSet.of(addedAttrs.values()));
162+
return rel.withAttributes(updatedOutput.stream().toList());
163+
}
164+
116165
private Expression replaceFieldsForFieldTransformations(Expression e, BlockLoaderExpression.PushedBlockLoaderExpression fuse) {
117166
// Change the expression to a reference of the pushed down function on the field
118167
FunctionEsField functionEsField = new FunctionEsField(fuse.field().field(), e.dataType(), fuse.config());

0 commit comments

Comments
 (0)