Skip to content

Commit

Permalink
migrate triangles
Browse files Browse the repository at this point in the history
  • Loading branch information
lassewesth committed Jun 24, 2024
1 parent 456c0a6 commit 323172d
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 54 deletions.
38 changes: 7 additions & 31 deletions algo/src/main/java/org/neo4j/gds/triangle/TriangleStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* emitting the nodeId and the number of triangles the node is part of,
* this impl. streams the actual nodeIds of each triangle once.
*/
public final class TriangleStream extends Algorithm<Stream<TriangleStream.Result>> {
public final class TriangleStream extends Algorithm<Stream<TriangleStreamResult>> {

private final Graph graph;
private final RelationshipIntersectFactory intersectFactory;
Expand All @@ -60,7 +60,7 @@ public final class TriangleStream extends Algorithm<Stream<TriangleStream.Result
private final Concurrency concurrency;
private final int nodeCount;
private final AtomicInteger runningThreads;
private final BlockingQueue<Result> resultQueue;
private final BlockingQueue<TriangleStreamResult> resultQueue;

public static TriangleStream create(
Graph graph,
Expand Down Expand Up @@ -98,15 +98,15 @@ private TriangleStream(
}

@Override
public Stream<Result> compute() {
public Stream<TriangleStreamResult> compute() {
progressTracker.beginSubTask(graph.nodeCount());
submitTasks();
final TerminationFlag flag = getTerminationFlag();
final Iterator<Result> it = new AbstractIterator<>() {
final Iterator<TriangleStreamResult> it = new AbstractIterator<>() {

@Override
protected Result fetch() {
Result result = null;
protected TriangleStreamResult fetch() {
TriangleStreamResult result = null;
while (result == null && flag.running() && (runningThreads.get() > 0 || !resultQueue.isEmpty())) {
result = resultQueue.poll();
}
Expand Down Expand Up @@ -150,7 +150,7 @@ public final void run() {
abstract void evaluateNode(int nodeId);

void emit(long nodeA, long nodeB, long nodeC) {
Result result = new Result(
var result = new TriangleStreamResult(
graph.toOriginalNodeId(nodeA),
graph.toOriginalNodeId(nodeB),
graph.toOriginalNodeId(nodeC));
Expand All @@ -177,28 +177,4 @@ public void accept(final long nodeA, final long nodeB, final long nodeC) {
}
}

/**
* result type
*/
public static class Result {

public final long nodeA;
public final long nodeB;
public final long nodeC;

public Result(long nodeA, long nodeB, long nodeC) {
this.nodeA = nodeA;
this.nodeB = nodeB;
this.nodeC = nodeC;
}

@Override
public String toString() {
return "Triangle{" +
nodeA +
", " + nodeB +
", " + nodeC +
'}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.triangle;

/**
* result type
*/
public class TriangleStreamResult {
public final long nodeA;
public final long nodeB;
public final long nodeC;

public TriangleStreamResult(long nodeA, long nodeB, long nodeC) {
this.nodeA = nodeA;
this.nodeB = nodeB;
this.nodeC = nodeC;
}

@Override
public String toString() {
return "Triangle{" +
nodeA +
", " + nodeB +
", " + nodeC +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.gds.conductance.ConductanceBaseConfig;
import org.neo4j.gds.conductance.ConductanceResult;
import org.neo4j.gds.config.AlgoBaseConfig;
import org.neo4j.gds.config.ConcurrencyConfig;
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.core.concurrency.ParallelUtil;
import org.neo4j.gds.core.utils.paged.dss.DisjointSetStruct;
Expand Down Expand Up @@ -74,12 +75,15 @@
import org.neo4j.gds.triangle.LocalClusteringCoefficientResult;
import org.neo4j.gds.triangle.TriangleCountBaseConfig;
import org.neo4j.gds.triangle.TriangleCountResult;
import org.neo4j.gds.triangle.TriangleStream;
import org.neo4j.gds.triangle.TriangleStreamResult;
import org.neo4j.gds.wcc.Wcc;
import org.neo4j.gds.wcc.WccBaseConfig;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import static org.neo4j.gds.modularityoptimization.ModularityOptimization.K1COLORING_MAX_ITERATIONS;

Expand Down Expand Up @@ -387,6 +391,17 @@ TriangleCountResult triangleCount(Graph graph, TriangleCountBaseConfig configura
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
}

Stream<TriangleStreamResult> triangles(Graph graph, ConcurrencyConfig configuration) {
var algorithm = TriangleStream.create(
graph,
DefaultPool.INSTANCE,
configuration.concurrency(),
terminationFlag
);

return algorithm.compute();
}

DisjointSetStruct wcc(Graph graph, WccBaseConfig configuration) {
var task = Tasks.leaf(LabelForProgressTracking.WCC.value, graph.relationshipCount());
var progressTracker = progressTrackerCreator.createProgressTracker(configuration, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ public MemoryEstimateResult triangleCount(TriangleCountBaseConfig configuration,
);
}

MemoryEstimation triangles() {
throw new MemoryEstimationNotImplementedException();
}

public MemoryEstimation wcc(SeedConfig configuration) {
return new WccMemoryEstimateDefinition(configuration.isIncremental()).memoryEstimation();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
import org.neo4j.gds.scc.SccStreamConfig;
import org.neo4j.gds.triangle.LocalClusteringCoefficientResult;
import org.neo4j.gds.triangle.LocalClusteringCoefficientStreamConfig;
import org.neo4j.gds.triangle.TriangleCountBaseConfig;
import org.neo4j.gds.triangle.TriangleCountResult;
import org.neo4j.gds.triangle.TriangleCountStreamConfig;
import org.neo4j.gds.triangle.TriangleStreamResult;
import org.neo4j.gds.wcc.WccStreamConfig;

import java.util.Optional;
import java.util.stream.Stream;

import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.ApproximateMaximumKCut;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.Conductance;
Expand All @@ -66,6 +69,7 @@
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.ModularityOptimization;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.SCC;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.TriangleCount;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.Triangles;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.WCC;

public class CommunityAlgorithmsStreamModeBusinessFacade {
Expand Down Expand Up @@ -291,6 +295,22 @@ public <RESULT> RESULT triangleCount(
);
}

public <RESULT> RESULT triangles(
GraphName graphName,
TriangleCountBaseConfig configuration,
ResultBuilder<TriangleCountBaseConfig, Stream<TriangleStreamResult>, RESULT, Void> resultBuilder
) {
return algorithmProcessingTemplate.processAlgorithm(
graphName,
configuration,
Triangles,
estimationFacade::triangles,
graph -> algorithms.triangles(graph, configuration),
Optional.empty(),
resultBuilder
);
}

public <RESULT> RESULT wcc(
GraphName graphName,
WccStreamConfig configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum Algorithm {
SteinerTree,
TopologicalSort,
TriangleCount,
Triangles,
WCC,
Yens;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public enum LabelForProgressTracking {
SteinerTree("SteinerTree"),
TopologicalSort("TopologicalSort"),
TriangleCount("TriangleCount"),
Triangles("Triangles"),
WCC("WCC"),
Yens("Yens");

Expand Down Expand Up @@ -109,6 +110,7 @@ public static LabelForProgressTracking from(Algorithm algorithm) {
case SteinerTree -> SteinerTree;
case TopologicalSort -> TopologicalSort;
case TriangleCount -> TriangleCount;
case Triangles -> Triangles;
case WCC -> WCC;
case Yens -> Yens;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public Function<CypherMapWrapper, AlgoBaseConfig> lookup(Algorithm algorithm) {
case SteinerTree -> SteinerTreeMutateConfig::of;
case TopologicalSort -> null;
case TriangleCount -> TriangleCountMutateConfig::of;
case Triangles -> null;
case WCC -> WccMutateConfig::of;
case Yens -> ShortestPathYensMutateConfig::of;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ static CanonicalProcedureName algorithmToName(Algorithm algorithm) {
case SteinerTree -> CanonicalProcedureName.parse("gds.steinerTree");
case TopologicalSort -> null;
case TriangleCount -> CanonicalProcedureName.parse("gds.triangleCount");
case Triangles -> null;
case WCC -> CanonicalProcedureName.parse("gds.wcc");
case Yens -> CanonicalProcedureName.parse("gds.shortestPath.yens");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Stub get(Algorithm algorithm) {
case SteinerTree -> new SteinerTreeStub();
case TopologicalSort -> null;
case TriangleCount -> new TriangleCountStub();
case Triangles -> null;
case WCC -> new WccStub();
case Yens -> new SinglePairShortestPathYensStub();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
*/
package org.neo4j.gds.triangle;

import org.neo4j.gds.BaseProc;
import org.neo4j.gds.executor.ProcedureExecutor;
import org.neo4j.gds.procedures.GraphDataScienceProcedures;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Internal;
import org.neo4j.procedure.Name;
Expand All @@ -31,38 +31,33 @@

import static org.neo4j.procedure.Mode.READ;

public class TriangleProc extends BaseProc {

public class TriangleProc {
static final String DESCRIPTION = "Triangles streams the nodeIds of each triangle in the graph.";

@Context
public GraphDataScienceProcedures facade;

@Procedure(name = "gds.triangles", mode = READ)
@Description(DESCRIPTION)
public Stream<TriangleStream.Result> stream(
public Stream<TriangleStreamResult> stream(
@Name(value = "graphName") String graphName,
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
) {
return new ProcedureExecutor<>(
new TriangleStreamSpecification(),
executionContext()
).compute(graphName, configuration);
return facade.algorithms().community().trianglesStream(graphName, configuration);
}

@Procedure(name = "gds.alpha.triangles", mode = READ, deprecatedBy = "gds.triangles")
@Description(DESCRIPTION)
@Internal
@Deprecated(forRemoval = true)
public Stream<TriangleStream.Result> alphaStream(
public Stream<TriangleStreamResult> alphaStream(
@Name(value = "graphName") String graphName,
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
) {
executionContext()
.metricsFacade()
.deprecatedProcedures().called("gds.alpha.triangles");

executionContext()
facade.deprecatedProcedures().called("gds.alpha.triangles");
facade
.log()
.warn(
"Procedure `gds.alpha.triangles` has been deprecated, please use `gds.triangles`.");
.warn("Procedure `gds.alpha.triangles` has been deprecated, please use `gds.triangles`.");

return stream(graphName, configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
description = DESCRIPTION,
executionMode = STREAM
)
public class TriangleStreamSpecification implements AlgorithmSpec<TriangleStream, Stream<TriangleStream.Result>, TriangleCountBaseConfig, Stream<TriangleStream.Result>, TriangleStreamFactory> {
public class TriangleStreamSpecification implements AlgorithmSpec<TriangleStream, Stream<TriangleStreamResult>, TriangleCountBaseConfig, Stream<TriangleStreamResult>, TriangleStreamFactory> {

@Override
public String name() {
Expand All @@ -55,7 +55,7 @@ public NewConfigFunction<TriangleCountBaseConfig> newConfigFunction() {
}

@Override
public ComputationResultConsumer<TriangleStream, Stream<TriangleStream.Result>, TriangleCountBaseConfig, Stream<TriangleStream.Result>> computationResultConsumer() {
public ComputationResultConsumer<TriangleStream, Stream<TriangleStreamResult>, TriangleCountBaseConfig, Stream<TriangleStreamResult>> computationResultConsumer() {
return (computationResult, executionContext) -> runWithExceptionLogging(
"Result streaming failed",
executionContext.log(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ private GraphDataScienceProcedures constructFacade() {
var communityProcedureFacade = CommunityProcedureFacade.create(
genericStub,
applicationsFacade,
null,
ProcedureReturnColumns.EMPTY,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ private GraphDataScienceProcedures createFacade() {
var communityProcedureFacade = CommunityProcedureFacade.create(
genericStub,
applicationsFacade,
null,
ProcedureReturnColumns.EMPTY,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.api.schema.GraphSchema;
import org.neo4j.gds.applications.ApplicationsFacade;
import org.neo4j.gds.applications.algorithms.machinery.AlgorithmEstimationTemplate;
import org.neo4j.gds.applications.algorithms.machinery.MemoryGuard;
import org.neo4j.gds.applications.algorithms.machinery.RequestScopedDependencies;
import org.neo4j.gds.catalog.GraphProjectProc;
Expand All @@ -69,7 +68,6 @@
import org.neo4j.gds.core.utils.warnings.EmptyUserLogRegistryFactory;
import org.neo4j.gds.extension.Neo4jGraph;
import org.neo4j.gds.logging.Log;
import org.neo4j.gds.memest.DatabaseGraphStoreEstimationService;
import org.neo4j.gds.metrics.PassthroughExecutionMetricRegistrar;
import org.neo4j.gds.metrics.algorithms.AlgorithmMetricsService;
import org.neo4j.gds.metrics.procedures.DeprecatedProceduresMetricService;
Expand Down Expand Up @@ -588,8 +586,6 @@ private GraphDataScienceProcedures constructGraphDataScienceProcedures() {
.with(new User(getUsername(), false))
.with(EmptyUserLogRegistryFactory.INSTANCE)
.build();
var algorithmEstimationTemplate = new AlgorithmEstimationTemplate(graphStoreCatalogService, new DatabaseGraphStoreEstimationService(
GraphLoaderContext.NULL_CONTEXT, requestScopedDependencies.getUser()), requestScopedDependencies);
var applicationsFacade = ApplicationsFacade.create(logMock, Optional.empty(), Optional.empty(), graphStoreCatalogService, MemoryGuard.DISABLED, new AlgorithmMetricsService(new PassthroughExecutionMetricRegistrar()), new ProjectionMetricsService(new PassthroughExecutionMetricRegistrar()), requestScopedDependencies);
var configurationParser = new ConfigurationParser(DefaultsConfiguration.Instance, LimitsConfiguration.Instance);
var configurationCreator = new ConfigurationCreator(configurationParser, null, requestScopedDependencies.getUser());
Expand All @@ -604,6 +600,7 @@ private GraphDataScienceProcedures constructGraphDataScienceProcedures() {
var communityProcedureFacade = CommunityProcedureFacade.create(
genericStub,
applicationsFacade,
null,
ProcedureReturnColumns.EMPTY,
null,
null,
Expand Down
Loading

0 comments on commit 323172d

Please sign in to comment.