pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/feldera/feldera/pull/5601/files

ssets.com/assets/primer-primitives-26e89bb5a0c37ae9.css" /> [SQL] Unused field analysis for aggregates by mihaibudiu · Pull Request #5601 · feldera/feldera · GitHub
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.dbsp.util.IndentStream;

/** Visitor which emits the circuit nodes in a graphviz file.
* The compiler options control the detail. On verbosity=0 table and view names are ommitted. */
* The compiler options control the detail. On details=0 table and view names are omitted. */
public class ToDotNodesVisitor extends CircuitVisitor {
protected final IndentStream stream;
// A higher value -> more details
Expand Down Expand Up @@ -68,7 +68,7 @@ public void endVisit() {

@Override
public VisitDecision preorder(DBSPSourceBaseOperator node) {
String name = (this.compiler.options.ioOptions.verbosity > 0 ? (node.tableName + " ") : "") + node.operation;
String name = (this.details > 0 ? (node.tableName + " ") : "") + node.operation;
this.stream.append(node.getNodeName(false))
.append(" [ shape=box style=filled fillcolor=lightgrey label=\"")
.append(node.getIdString())
Expand Down Expand Up @@ -105,7 +105,7 @@ public VisitDecision preorder(DBSPViewBaseOperator node) {
.append(node.getIdString())
.append(isMultiset(node))
.append(annotations(node))
.append(this.compiler.options.ioOptions.verbosity > 0 ? " " + node.viewName.name() : "")
.append(this.details > 0 ? " " + node.viewName.name() : "")
.append("\"")
.append(" style=filled fillcolor=lightgrey")
.append("]")
Expand Down Expand Up @@ -159,12 +159,16 @@ String getPositions(IDBSPInnerNode node) {
return result.toString();
}

static String rustToDot(String rust) {
String f = escapeString(rust);
return f.replace("\n", "\\l");
}

String convertFunction(DBSPExpression expression) {
String result = "";
if (this.details > 3) {
String f = ToRustInnerVisitor.toRustString(this.compiler(), expression, null, true);
f = escapeString(f);
result = f.replace("\n", "\\l");
result = rustToDot(f);
}
if (this.details >= 3) {
result += getPositions(expression);
Expand All @@ -178,7 +182,7 @@ String getFunction(DBSPSimpleOperator node) {
DBSPAggregateOperatorBase aggregate = node.to(DBSPAggregateOperatorBase.class);
if (aggregate.aggregateList != null) {
if (this.details > 3) {
return escapeString(aggregate.aggregateList.toString());
return rustToDot(aggregate.aggregateList.toString());
} else if (details >= 3) {
return this.getPositions(aggregate.aggregateList);
}
Expand All @@ -188,14 +192,14 @@ String getFunction(DBSPSimpleOperator node) {
node.to(DBSPPartitionedRollingAggregateWithWaterlineOperator.class);
if (aggregate.aggregateList != null) {
if (this.details > 3) {
return escapeString(aggregate.aggregateList.toString());
return rustToDot(aggregate.aggregateList.toString());
} else if (this.details >= 3) {
return this.getPositions(aggregate.aggregateList);
}
}
} else if (node.is(DBSPChainOperator.class) && this.details > 3) {
String chain = node.to(DBSPChainOperator.class).chain.toString();
return escapeString(chain);
return rustToDot(chain);
}
if (expression == null)
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void createOptimizer() {
this.add(new OptimizeWithGraph(compiler, g -> new RemoveNoops(compiler, g)));
AnalyzedSet<DBSPOperator> operatorsAnalyzed = new AnalyzedSet<>();
this.add(new OptimizeWithGraph(compiler,
g -> new OptimizeMaps(compiler, true, g, operatorsAnalyzed), 1));
g -> new OptimizeProjections(compiler, true, g, operatorsAnalyzed), 1));
this.add(new RemoveViewOperators(compiler, false));
this.add(new UnusedFields(compiler));
this.add(new Intern(compiler));
Expand All @@ -109,7 +109,7 @@ void createOptimizer() {
this.add(new RemoveFilters(compiler));
this.add(new OptimizeWithGraph(compiler, g -> new OptimizeProjectionVisitor(compiler, g)));
this.add(new OptimizeWithGraph(compiler,
g -> new OptimizeMaps(compiler, true, g, operatorsAnalyzed)));
g -> new OptimizeProjections(compiler, true, g, operatorsAnalyzed)));
this.add(new OptimizeWithGraph(compiler, g -> new FilterJoinVisitor(compiler, g)));
this.add(new MonotoneAnalyzer(compiler));
// Can remove this table after the monotone analysis only
Expand All @@ -131,7 +131,7 @@ void createOptimizer() {
this.add(new RemoveIdentityOperators(compiler));
this.add(new OptimizeWithGraph(compiler, g -> new ChainVisitor(compiler, g)));
this.add(new OptimizeWithGraph(compiler,
g -> new OptimizeMaps(compiler, false, g, operatorsAnalyzed)));
g -> new OptimizeProjections(compiler, false, g, operatorsAnalyzed)));
this.add(new SimplifyWaterline(compiler)
.circuitRewriter(node -> node.hasAnnotation(a -> a.is(Waterline.class))));
this.add(new EliminateDump(compiler).circuitRewriter(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@ public void postorder(DBSPStreamAggregateOperator operator) {
return;
}
OutputPort i = this.mapped(operator.input());

if (operator.aggregateList.isEmpty()) {
// Implement as a linear aggregate, since it's more efficient
DBSPDifferentiateOperator diff = new DBSPDifferentiateOperator(operator.getRelNode(), i);
this.addOperator(diff);
LinearAggregate linear = operator.aggregateList.asLinear(this.compiler);
DBSPSimpleOperator aggOp = new DBSPAggregateLinearPostprocessOperator(
operator.getRelNode(), operator.getOutputIndexedZSetType(),
linear.map, linear.postProcess, diff.outputPort());
this.addOperator(aggOp);
DBSPSimpleOperator result = new DBSPIntegrateOperator(operator.getRelNode(), aggOp.outputPort());
this.map(operator, result);
return;
}

DBSPTypeIndexedZSet inputType = i.getOutputIndexedZSetType();
boolean appendOnly = this.isAppendOnly.test(operator.input());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void postorder(DBSPApply2Operator node) {
@Override
public void postorder(DBSPStreamAggregateOperator node) {
if (node.function != null) {
// OrderBy
// OrderBy implemented as an aggregate
super.postorder(node);
return;
}
Expand All @@ -284,7 +284,7 @@ public void postorder(DBSPStreamAggregateOperator node) {
@Override
public void postorder(DBSPAggregateOperator node) {
if (node.function != null) {
// OrderBy
// OrderBy implemented as an aggregate
super.postorder(node);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.calcite.util.Pair;
import org.dbsp.sqlCompiler.circuit.annotation.IsProjection;
import org.dbsp.sqlCompiler.circuit.operator.DBSPAggregateZeroOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPAsofJoinOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPAntiJoinOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPApplyOperator;
Expand All @@ -22,6 +23,7 @@
import org.dbsp.sqlCompiler.circuit.operator.DBSPNegateOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPNoopOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamAggregateOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamAntiJoinOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamJoinIndexOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
Expand All @@ -41,9 +43,13 @@
import org.dbsp.sqlCompiler.compiler.visitors.inner.Projection;
import org.dbsp.sqlCompiler.compiler.visitors.inner.ResolveReferences;
import org.dbsp.sqlCompiler.compiler.visitors.inner.Substitution;
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FieldUseMap;
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FindUnusedFields;
import org.dbsp.sqlCompiler.ir.DBSPParameter;
import org.dbsp.sqlCompiler.ir.IDBSPDeclaration;
import org.dbsp.sqlCompiler.ir.IDBSPInnerNode;
import org.dbsp.sqlCompiler.ir.aggregate.DBSPAggregateList;
import org.dbsp.sqlCompiler.ir.aggregate.IAggregate;
import org.dbsp.sqlCompiler.ir.expression.DBSPBaseTupleExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPClosureExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
Expand All @@ -54,7 +60,9 @@
import org.dbsp.sqlCompiler.ir.expression.DBSPTupleExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPVariablePath;
import org.dbsp.sqlCompiler.ir.type.DBSPType;
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeFunction;
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeRawTuple;
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeTuple;
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeTupleBase;
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeIndexedZSet;
import org.dbsp.util.Linq;
Expand All @@ -64,15 +72,16 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/** Optimizes patterns containing Map operators. */
public class OptimizeMaps extends CircuitCloneWithGraphsVisitor {
/** Optimizes projections (Map or MapIndex) following various other operators. */
public class OptimizeProjections extends CircuitCloneWithGraphsVisitor {
/** If true only optimize projections after joins */
final boolean onlyProjections;
final AnalyzedSet<DBSPOperator> operatorsAnalyzed;

public OptimizeMaps(DBSPCompiler compiler, boolean onlyProjections,
CircuitGraphs graphs, AnalyzedSet<DBSPOperator> operatorsAnalyzed) {
public OptimizeProjections(DBSPCompiler compiler, boolean onlyProjections,
CircuitGraphs graphs, AnalyzedSet<DBSPOperator> operatorsAnalyzed) {
super(compiler, graphs, false);
this.onlyProjections = onlyProjections;
this.operatorsAnalyzed = operatorsAnalyzed;
Expand Down Expand Up @@ -175,6 +184,21 @@ public void postorder(DBSPMapIndexOperator operator) {
.to(DBSPSimpleOperator.class);
this.map(operator, result, operator != result);
return;
} else if (source.node().is(DBSPStreamAggregateOperator.class)) {
DBSPStreamAggregateOperator aggregate = source.node().to(DBSPStreamAggregateOperator.class);
Logger.INSTANCE.belowLevel(this, 2)
.appendSupplier(() -> source.simpleNode().operation + " -> MapIndex")
.newline();
Projection projection = new Projection(this.compiler(), true, true);
projection.apply(operator.getFunction());
if (!projection.isProjection || aggregate.aggregateList == null) {
super.postorder(operator);
return;
}
boolean replaced = this.processAggregate(aggregate, operator);
if (!replaced)
super.postorder(operator);
return;
} else {
Projection projection = new Projection(this.compiler());
projection.apply(operator.getFunction());
Expand Down Expand Up @@ -391,6 +415,73 @@ public void postorder(DBSPApplyOperator operator) {
super.postorder(operator);
}

/** Process an aggregate followed by a MapIndex operator; return 'true' if the operators
* required changes, false if they are unchanged. When the operator requires change,
* it is inserted in the circuit and map is remapped to the new operator. */
boolean processAggregate(DBSPStreamAggregateOperator aggregate, DBSPMapIndexOperator map) {
FindUnusedFields unused = new FindUnusedFields(this.compiler);
DBSPClosureExpression function = map.getClosureFunction();
Utilities.enforce(function.parameters.length == 1);
unused.findUnusedFields(function);
FieldUseMap useMap = unused.parameterFieldMap.get(function.parameters[0]);
FieldUseMap valueUse = useMap.field(1);
DBSPTypeIndexedZSet ix = aggregate.getOutputIndexedZSetType();

if (valueUse.hasUnusedFields(1)) {
List<Integer> usedFields = valueUse.deref().getUsedFields();
// We must determine whether we can remove any aggregates from the aggregate list.
// An aggregate can be removed if none of the fields it is producing are needed.
final List<IAggregate> used = new ArrayList<>();
{
int index = 0;
for (IAggregate agg : Objects.requireNonNull(aggregate.aggregateList).aggregates) {
if (usedFields.contains(index))
used.add(agg);
index++;
}
}
if (used.size() < aggregate.aggregateList.aggregates.size()) {
// Build a new aggregate which only generates the used fields
DBSPAggregateList list = new DBSPAggregateList(
aggregate.aggregateList.getNode(), aggregate.aggregateList.rowVar, used);
DBSPType elementType = list.getType().to(DBSPTypeFunction.class).resultType;
DBSPStreamAggregateOperator newAggregate = new DBSPStreamAggregateOperator(
aggregate.getRelNode(), new DBSPTypeIndexedZSet(ix.getNode(), ix.keyType, elementType),
null, list, aggregate.input());
this.addOperator(newAggregate);

// Synthesize a MapIndex after the aggregate which fills the removed spaces with the
// "zeros" from the removed aggregates
DBSPVariablePath var = newAggregate.getOutputIndexedZSetType().getKVRefType().var();
int index = 0;
int newAggregateIndex = 0;
final List<DBSPExpression> fields = new ArrayList<>();
for (IAggregate agg : Objects.requireNonNull(aggregate.aggregateList).aggregates) {
if (usedFields.contains(index)) {
fields.add(var.field(1).deref().field(newAggregateIndex));
newAggregateIndex++;
} else {
fields.add(agg.getEmptySetResult());
}
index++;
}

DBSPClosureExpression closure = new DBSPRawTupleExpression(
new DBSPTupleExpression(DBSPTypeTuple.flatten(var.field(0).deref()), false),
new DBSPTupleExpression(fields, false))
.closure(var);
DBSPMapIndexOperator filler = new DBSPMapIndexOperator(map.getRelNode(), closure, newAggregate.outputPort());
this.addOperator(filler);

DBSPSimpleOperator rewritten = map.withInputs(Linq.list(filler.outputPort()), false)
.to(DBSPSimpleOperator.class);
this.map(map, rewritten);
return true;
}
}
return false;
}

@Override
public void postorder(DBSPMapOperator operator) {
if (this.done(operator)) {
Expand Down Expand Up @@ -455,7 +546,7 @@ public void postorder(DBSPMapOperator operator) {
return;
}
} else if (source.node().is(DBSPFlatMapOperator.class)
&& operator.getFunction().is(DBSPClosureExpression.class)) {
&& operator.getFunction().is(DBSPClosureExpression.class) && inputFanout == 1) {
DBSPFlatmap sourceFlatmap = source.simpleNode().getFunction().as(DBSPFlatmap.class);
var shuffle = DetectShuffle.analyze(this.compiler, operator.getClosureFunction());
if (sourceFlatmap != null && shuffle != null) {
Expand Down Expand Up @@ -535,15 +626,37 @@ public void postorder(DBSPMapOperator operator) {
.to(DBSPSimpleOperator.class);
this.map(operator, result, operator != result);
return;
} else if (source.node().is(DBSPAggregateZeroOperator.class) && inputFanout == 1) {
Logger.INSTANCE.belowLevel(this, 2)
.appendSupplier(() -> source.simpleNode().operation + " -> Map")
.newline();
Projection projection = new Projection(this.compiler(), true, true);
projection.apply(operator.getFunction());
if (!projection.isProjection) {
super.postorder(operator);
return;
}
DBSPAggregateZeroOperator zero = source.node().to(DBSPAggregateZeroOperator.class);
DBSPSimpleOperator newProjection = operator
.withInputs(Linq.list(zero.input()), true).to(DBSPSimpleOperator.class);
this.addOperator(newProjection);
DBSPExpression newZero = operator.getClosureFunction()
.call(zero.getFunction().borrow())
.reduce(this.compiler);
DBSPAggregateZeroOperator result = new DBSPAggregateZeroOperator(
zero.getRelNode(), newZero, newProjection.outputPort());
this.map(operator, result);
return;
}

if (source.node().is(DBSPAntiJoinOperator.class) || source.node().is(DBSPStreamAntiJoinOperator.class)) {
Logger.INSTANCE.belowLevel(this, 2)
.appendSupplier(() -> source.simpleNode().operation + " -> Map")
.newline();
DBSPBinaryOperator join = source.node().to(DBSPBinaryOperator.class);
Projection projection = new Projection(this.compiler());
projection.apply(operator.getFunction());
if (projection.isProjection) {
if (projection.isProjection && inputFanout == 1) {
OutputPort left = join.left();
OutputPort right = join.right();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.dbsp.sqlCompiler.compiler.visitors.outer.Conditional;
import org.dbsp.sqlCompiler.compiler.visitors.outer.DeadCode;
import org.dbsp.sqlCompiler.compiler.visitors.outer.Graph;
import org.dbsp.sqlCompiler.compiler.visitors.outer.OptimizeMaps;
import org.dbsp.sqlCompiler.compiler.visitors.outer.OptimizeProjections;
import org.dbsp.sqlCompiler.compiler.visitors.outer.OptimizeWithGraph;
import org.dbsp.sqlCompiler.compiler.visitors.outer.Passes;
import org.dbsp.sqlCompiler.compiler.visitors.outer.Repeat;
Expand Down Expand Up @@ -55,7 +55,7 @@ static class OnePass extends Passes {

Graph graph0 = new Graph(compiler);
this.add(graph0);
this.add(new OptimizeMaps(compiler, true, graph0.getGraphs(), mapOperators));
this.add(new OptimizeProjections(compiler, true, graph0.getGraphs(), mapOperators));
this.add(new DeadCode(compiler, true));

Graph graph = new Graph(compiler);
Expand Down
Loading
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy