2929import org .neo4j .gds .core .utils .paged .HugeAtomicLongArray ;
3030import org .neo4j .gds .core .utils .paged .HugeLongArray ;
3131import org .neo4j .gds .core .utils .progress .tasks .ProgressTracker ;
32+ import org .neo4j .gds .core .utils .queue .HugeLongPriorityQueue ;
3233import org .neo4j .gds .paths .ImmutablePathResult ;
3334import org .neo4j .gds .paths .PathResult ;
3435import org .neo4j .gds .paths .delta .TentativeDistances ;
3940import java .util .concurrent .ExecutorService ;
4041import java .util .concurrent .atomic .AtomicLong ;
4142import java .util .concurrent .atomic .LongAdder ;
43+ import java .util .concurrent .locks .ReentrantLock ;
4244import java .util .stream .Collectors ;
4345import java .util .stream .IntStream ;
4446
5658public final class SteinerBasedDeltaStepping extends Algorithm <DijkstraResult > {
5759
5860 public static final int NO_BIN = Integer .MAX_VALUE ;
61+
62+ private static final long NO_TERMINAL = -1 ;
5963 public static final int BIN_SIZE_THRESHOLD = 1000 ;
6064 private final Graph graph ;
6165 private final long startNode ;
@@ -65,11 +69,9 @@ public final class SteinerBasedDeltaStepping extends Algorithm<DijkstraResult> {
6569 private final TentativeDistances distances ;
6670 private final ExecutorService executorService ;
6771 private long pathIndex ;
68-
6972 private final long numOfTerminals ;
7073 private final BitSet unvisitedTerminal ;
7174 private final BitSet mergedWithSource ;
72-
7375 private final LongAdder metTerminals ;
7476
7577 SteinerBasedDeltaStepping (
@@ -143,24 +145,13 @@ private void syncPhase(List<SteinerBasedDeltaTask> tasks,int currentBin, AtomicL
143145 progressTracker .endSubTask ();
144146 }
145147
146- private long nextTerminal (){
147- long index =unvisitedTerminal .nextSetBit (0 );
148- long bestTerminal =index ;
149- double bestDistance =distances .distance (bestTerminal );
150- index =unvisitedTerminal .nextSetBit (index +1 );
151- while (index !=-1 ){
152- double currentDistance =distances .distance (index );
153- if (currentDistance < bestDistance ){
154- bestTerminal =index ;
155- bestDistance =currentDistance ;
156- }
157- index =unvisitedTerminal .nextSetBit (index +1 );
158- }
159- return bestTerminal ;
148+ private long nextTerminal (HugeLongPriorityQueue terminalQueue ) {
149+ return (terminalQueue .isEmpty ()) ? NO_TERMINAL : terminalQueue .top ();
160150 }
161151
162152 private boolean updateSteinerTree (long terminalId ,AtomicLong frontierIndex ,List <PathResult > paths , ImmutablePathResult .Builder pathResultBuilder ) {
163153 //add the new path to the solution
154+
164155 paths .add (pathResult (
165156 pathResultBuilder ,
166157 pathIndex ++,
@@ -182,7 +173,7 @@ private boolean updateSteinerTree(long terminalId,AtomicLong frontierIndex,List<
182173
183174 }
184175
185- private long tryToUpdateSteinerTree (long oldBin , long currentBin ) {
176+ private long tryToUpdateSteinerTree (long oldBin , long currentBin , HugeLongPriorityQueue terminalQueue ) {
186177 boolean shouldComputeClosestTerminal = false ;
187178 //delta-Stepping differs by Dijkstra in that it processes the nodes not one-by-one but in batches
188179 //whereas in dijkstra once we examine a node, we are certain we have found the shortest path to it,
@@ -194,11 +185,12 @@ private long tryToUpdateSteinerTree(long oldBin, long currentBin) {
194185 //For the moment, we use a simple criteria to discover if there is a terminal for which with full certainty,
195186 //we have found a shortest to it: Whenever we change from one bin to another, we find the terminal of smallest distance
196187 //if it's distance is below the currentBin, the path to it is optimal.
197- if (currentBin == - 1 || oldBin < currentBin ) {
188+ if (currentBin == NO_BIN || oldBin < currentBin ) {
198189 shouldComputeClosestTerminal = true ;
199190 }
200191 if (shouldComputeClosestTerminal ) {
201- long terminalId = nextTerminal ();
192+ long terminalId = nextTerminal (terminalQueue );
193+ if (terminalId == NO_TERMINAL ) return NO_TERMINAL ;
202194 if (distances .distance (terminalId ) < currentBin * delta ) {
203195 return terminalId ;
204196 }
@@ -217,12 +209,14 @@ public DijkstraResult compute() {
217209 var frontierIndex = new AtomicLong (0 );
218210 var frontierSize = new AtomicLong (1 );
219211
220- List <PathResult > paths = new ArrayList <>();
212+ List <PathResult > paths = new ArrayList <>();
221213
222214 this .frontier .set (currentBin , startNode );
223215 mergedWithSource .set (startNode );
224216 this .distances .set (startNode , -1 , 0 );
225217
218+ HugeLongPriorityQueue terminalQueue = HugeLongPriorityQueue .min (unvisitedTerminal .size ());
219+ var terminalQueueLock = new ReentrantLock ();
226220 var tasks = IntStream
227221 .range (0 , concurrency )
228222 .mapToObj (i -> new SteinerBasedDeltaTask (
@@ -231,7 +225,10 @@ public DijkstraResult compute() {
231225 distances ,
232226 delta ,
233227 frontierIndex ,
234- mergedWithSource
228+ mergedWithSource ,
229+ terminalQueue ,
230+ terminalQueueLock ,
231+ unvisitedTerminal
235232 ))
236233 .collect (Collectors .toList ());
237234
@@ -244,10 +241,11 @@ public DijkstraResult compute() {
244241 // Find smallest non-empty bin across all tasks
245242 currentBin = tasks .stream ().mapToInt (SteinerBasedDeltaTask ::minNonEmptyBin ).min ().orElseThrow ();
246243
247- long terminalId = tryToUpdateSteinerTree (oldCurrentBin , currentBin );
244+ long terminalId = tryToUpdateSteinerTree (oldCurrentBin , currentBin , terminalQueue );
248245
249246 if (terminalId != -1 ) { //if we are certain that we have found a shortest path to one of the remaining terminals
250247 //we update the solution and merge its path to the root
248+ terminalQueue .pop ();
251249 shouldBreak = updateSteinerTree (terminalId , frontierIndex , paths , pathResultBuilder );
252250 currentBin = 0 ;
253251 } else { //otherwise proceed as normal, sync the contents of the bucket for each thread to the global queue.
0 commit comments