Skip to content

Commit

Permalink
GH-2924: Lateral - fixed injection for tables, enhanced QueryExec API…
Browse files Browse the repository at this point in the history
… for easier testing.
  • Loading branch information
Aklakan committed Feb 1, 2025
1 parent 17dc85d commit 2783255
Show file tree
Hide file tree
Showing 26 changed files with 558 additions and 110 deletions.
19 changes: 12 additions & 7 deletions jena-arq/src/main/java/org/apache/jena/sparql/algebra/Algebra.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,27 @@ public static Binding merge(Binding bindingLeft, Binding bindingRight) {

// If compatible, merge. Iterate over variables in right but not in left.
BindingBuilder b = Binding.builder(bindingLeft);
for ( Iterator<Var> vIter = bindingRight.vars() ; vIter.hasNext() ; ) {
Var v = vIter.next();
Node n = bindingRight.get(v);
bindingRight.forEach((v, n) -> {
if ( !bindingLeft.contains(v) )
b.add(v, n);
}
});
return b.build();
}

public static boolean compatible(Binding bindingLeft, Binding bindingRight) {
// Test to see if compatible: Iterate over variables in left
for ( Iterator<Var> vIter = bindingLeft.vars() ; vIter.hasNext() ; ) {
Var v = vIter.next();
return compatible(bindingLeft, bindingRight, bindingLeft.vars());
}

/** Test to see if bindings are compatible for all variables of the provided iterator. */
public static boolean compatible(Binding bindingLeft, Binding bindingRight, Iterator<Var> vars) {
while (vars.hasNext() ) {
Var v = vars.next();
Node nLeft = bindingLeft.get(v);
Node nRight = bindingRight.get(v);
if ( nLeft == null )
continue;

Node nRight = bindingRight.get(v);
if ( nRight != null && !nRight.equals(nLeft) )
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,55 @@

import org.apache.jena.graph.Node ;
import org.apache.jena.sparql.algebra.table.Table1 ;
import org.apache.jena.sparql.algebra.table.TableBuilder;
import org.apache.jena.sparql.algebra.table.TableEmpty ;
import org.apache.jena.sparql.algebra.table.TableN ;
import org.apache.jena.sparql.algebra.table.TableUnit ;
import org.apache.jena.sparql.core.Var ;
import org.apache.jena.sparql.engine.QueryIterator ;
import org.apache.jena.sparql.engine.binding.Binding ;
import org.apache.jena.sparql.exec.RowSet ;

public class TableFactory
{
public static Table createUnit()
{ return new TableUnit() ; }

public static Table createEmpty()
{ return new TableEmpty() ; }

public static Table create()
{ return new TableN() ; }

public static Table create(List<Var> vars)
{ return new TableN(vars) ; }

public static Table create(QueryIterator queryIterator)
{
{
if ( queryIterator.isJoinIdentity() ) {
queryIterator.close();
return createUnit() ;
}
return new TableN(queryIterator) ;

return builder().consumeRowsAndVars(queryIterator).build();
}

public static Table create(Var var, Node value)
{ return new Table1(var, value) ; }

/** Creates a table from the detached bindings of the row set. */
public static Table create(RowSet rs)
{
TableBuilder builder = builder();
builder.addVars(rs.getResultVars());
rs.forEach(row -> {
Binding b = row.detach();
builder.addRow(b);
});
return builder.build();
}

public static TableBuilder builder() {
return new TableBuilder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.jena.sparql.algebra.table;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

import org.apache.jena.sparql.algebra.Table;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.join.ImmutableUniqueList;

/**
* Builder for immutable instances of {@link Table}.
* This builder is not thread safe.
*/
public class TableBuilder {
private ImmutableUniqueList.Builder<Var> varsBuilder = ImmutableUniqueList.newUniqueListBuilder(Var.class);

private List<Binding> rows = new ArrayList<>();
private boolean copyRowsOnNextMutation = false;

public TableBuilder() {
super();
}

public static TableBuilder of(Table table) {
return of(table.getVars(), table.rows());
}

public static TableBuilder of(Collection<Var> vars, Iterator<Binding> rows) {
TableBuilder result = new TableBuilder();
result.addVars(vars);
result.addRows(rows);
return result;
}

// Vars ----

/** Returns an immutable snapshot of this builder's current variables. */
public List<Var> snapshotVars() {
return varsBuilder.build();
}

public int sizeVars() {
return varsBuilder.size();
}

public TableBuilder addVar(Var var) {
varsBuilder.add(var);
return this;
}

public TableBuilder addVars(Collection<Var> vars) {
varsBuilder.addAll(vars);
return this;
}

public TableBuilder addVars(Iterator<Var> vars) {
vars.forEachRemaining(varsBuilder::add);
return this;
}

/** Adds the variables of a binding but not the binding itself. */
public TableBuilder addVarsFromRow(Binding row) {
row.vars().forEachRemaining(varsBuilder::add);
return this;
}

// Rows -----

private void copyRowsIfNeeded() {
if (copyRowsOnNextMutation) {
rows = new ArrayList<>(rows);
copyRowsOnNextMutation = false;
}
}

/** Returns an immutable snapshot of this builder's current rows. */
public List<Binding> snapshotRows() {
return List.copyOf(rows);
}

public int sizeRows() {
return rows.size();
}

public TableBuilder addRow(Binding row) {
copyRowsIfNeeded();
rows.add(row);
return this;
}

public TableBuilder addRows(Collection<Binding> newRows) {
copyRowsIfNeeded();
rows.addAll(newRows);
return this;
}

public TableBuilder addRows(Iterator<Binding> newRows) {
copyRowsIfNeeded();
newRows.forEachRemaining(rows::add);
return this;
}

// Rows and Vars -----

/** This method assumes prior call to copyRowsIfNeeded(). */
private void addRowAndVarsInternal(Binding row) {
addVarsFromRow(row);
rows.add(row);
}

public TableBuilder addRowAndVars(Binding row) {
copyRowsIfNeeded();
addRowAndVarsInternal(row);
return this;
}

public TableBuilder addRowsAndVars(Collection<Binding> newRows) {
copyRowsIfNeeded();
newRows.forEach(this::addVarsFromRow);
rows.addAll(newRows);
return this;
}

public TableBuilder addRowsAndVars(Iterator<Binding> newRows) {
copyRowsIfNeeded();
newRows.forEachRemaining(this::addRowAndVarsInternal);
return this;
}

/**
* Similar to {@link #addRowsAndVars(Iterator)} but
* also closes the given QueryIterator when done.
*/
public TableBuilder consumeRowsAndVars(QueryIterator qIter) {
Objects.requireNonNull(qIter);
try {
addRowsAndVars(qIter);
} finally {
qIter.close();
}
return this;
}

// General -----

public TableBuilder resetVars() {
varsBuilder.clear();
return this;
}

public TableBuilder resetRows() {
if (copyRowsOnNextMutation) {
rows = new ArrayList<>();
copyRowsOnNextMutation = false;
} else {
rows.clear();
}
return this;
}

/** Reset variables and rows. */
public TableBuilder reset() {
resetVars();
resetRows();
return this;
}

public Table build() {
List<Var> finalVars = snapshotVars();
List<Binding> finalRows = Collections.unmodifiableList(rows);
copyRowsOnNextMutation = true;
return new TableData(finalVars, finalRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@

package org.apache.jena.sparql.algebra.table ;

import java.util.Collections;
import java.util.List ;

import org.apache.jena.sparql.ARQException ;
import org.apache.jena.sparql.core.Var ;
import org.apache.jena.sparql.engine.binding.Binding ;

/** Immutable table. */
public class TableData extends TableN {
public TableData(List<Var> variables, List<Binding> rows) {
super(variables, rows) ;
super(Collections.unmodifiableList(variables), Collections.unmodifiableList(rows)) ;
}

@Override
public void addBinding(Binding binding) {
throw new ARQException("Can't add bindings to an existing data table") ;
}

public List<Binding> getRows() {
return rows ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList ;
import java.util.Iterator ;
import java.util.List ;
import java.util.Objects ;

import org.apache.jena.sparql.core.Var ;
import org.apache.jena.sparql.engine.ExecutionContext ;
Expand Down Expand Up @@ -49,15 +50,12 @@ public TableN(QueryIterator qIter) {
}

protected TableN(List<Var> variables, List<Binding> rows) {
this.vars = variables ;
this.rows = rows ;
this.vars = Objects.requireNonNull(variables) ;
this.rows = Objects.requireNonNull(rows) ;
}

private void materialize(QueryIterator qIter) {
while (qIter.hasNext()) {
Binding binding = qIter.nextBinding() ;
addBinding(binding) ;
}
qIter.forEachRemaining(this::addBinding);
qIter.close() ;
}

Expand Down Expand Up @@ -105,4 +103,8 @@ public List<String> getVarNames() {
public List<Var> getVars() {
return vars ;
}

public List<Binding> getRows() {
return rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ public default boolean contains(String varName) {

@Override
public boolean equals(Object other);

/**
* Returns a binding which is guaranteed to be independent of
* any resources such as an ongoing query execution or a disk-based dataset.
* May return itself if it is already detached.
*/
public Binding detach();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ protected void forEach1(BiConsumer<Var, Node> action) { }

@Override
protected Node get1(Var var) { return null; }

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new Binding0(newParent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ protected Node get1(Var v) {
return value;
return null;
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new Binding1(newParent, var, value);
}
}
Loading

0 comments on commit 2783255

Please sign in to comment.