diff options
Diffstat (limited to 'BFS/dpu/task.c')
-rw-r--r-- | BFS/dpu/task.c | 274 |
1 files changed, 151 insertions, 123 deletions
diff --git a/BFS/dpu/task.c b/BFS/dpu/task.c index 43a2d0f..44ec214 100644 --- a/BFS/dpu/task.c +++ b/BFS/dpu/task.c @@ -20,127 +20,155 @@ BARRIER_INIT(bfsBarrier, NR_TASKLETS); MUTEX_INIT(nextFrontierMutex); // main -int main() { - - if(me() == 0) { - mem_reset(); // Reset the heap - } - // Barrier - barrier_wait(&my_barrier); - - // Load parameters - uint32_t params_m = (uint32_t) DPU_MRAM_HEAP_POINTER; - struct DPUParams* params_w = (struct DPUParams*) mem_alloc(ROUND_UP_TO_MULTIPLE_OF_8(sizeof(struct DPUParams))); - mram_read((__mram_ptr void const*)params_m, params_w, ROUND_UP_TO_MULTIPLE_OF_8(sizeof(struct DPUParams))); - - // Extract parameters - uint32_t numGlobalNodes = params_w->numNodes; - uint32_t startNodeIdx = params_w->dpuStartNodeIdx; - uint32_t numNodes = params_w->dpuNumNodes; - uint32_t nodePtrsOffset = params_w->dpuNodePtrsOffset; - uint32_t level = params_w->level; - uint32_t nodePtrs_m = params_w->dpuNodePtrs_m; - uint32_t neighborIdxs_m = params_w->dpuNeighborIdxs_m; - uint32_t nodeLevel_m = params_w->dpuNodeLevel_m; - uint32_t visited_m = params_w->dpuVisited_m; - uint32_t currentFrontier_m = params_w->dpuCurrentFrontier_m; - uint32_t nextFrontier_m = params_w->dpuNextFrontier_m; - - if(numNodes > 0) { - - // Sanity check - if(me() == 0) { - if(numGlobalNodes%64 != 0) { - //PRINT_ERROR("The number of nodes in the graph is not a multiple of 64!"); - } - if(startNodeIdx%64 != 0 || numNodes%64 != 0) { - //PRINT_ERROR("The number of nodes assigned to the DPU is not aligned to or a multiple of 64!"); - } - } - - // Allocate WRAM cache for each tasklet to use throughout - uint64_t* cache_w = mem_alloc(sizeof(uint64_t)); - - // Update current frontier and visited list based on the next frontier from the previous iteration - for(uint32_t nodeTileIdx = me(); nodeTileIdx < numGlobalNodes/64; nodeTileIdx += NR_TASKLETS) { - - // Get the next frontier tile from MRAM - uint64_t nextFrontierTile = load8B(nextFrontier_m, nodeTileIdx, cache_w); - - // Process next frontier tile if it is not empty - if(nextFrontierTile) { - - // Mark everything that was previously added to the next frontier as visited - uint64_t visitedTile = load8B(visited_m, nodeTileIdx, cache_w); - visitedTile |= nextFrontierTile; - store8B(visitedTile, visited_m, nodeTileIdx, cache_w); - - // Clear the next frontier - store8B(0, nextFrontier_m, nodeTileIdx, cache_w); - - } - - // Extract the current frontier from the previous next frontier and update node levels - uint32_t startTileIdx = startNodeIdx/64; - uint32_t numTiles = numNodes/64; - if(startTileIdx <= nodeTileIdx && nodeTileIdx < startTileIdx + numTiles) { - - // Update current frontier - store8B(nextFrontierTile, currentFrontier_m, nodeTileIdx - startTileIdx, cache_w); - - // Update node levels - if(nextFrontierTile) { - for(uint32_t node = nodeTileIdx*64; node < (nodeTileIdx + 1)*64; ++node) { - if(isSet(nextFrontierTile, node%64)) { - store4B(level, nodeLevel_m, node - startNodeIdx, cache_w); // No false sharing so no need for locks - } - } - } - } - - } - - // Wait until all tasklets have updated the current frontier - barrier_wait(&bfsBarrier); - - // Identify tasklet's nodes - uint32_t numNodesPerTasklet = (numNodes + NR_TASKLETS - 1)/NR_TASKLETS; - uint32_t taskletNodesStart = me()*numNodesPerTasklet; - uint32_t taskletNumNodes; - if(taskletNodesStart > numNodes) { - taskletNumNodes = 0; - } else if(taskletNodesStart + numNodesPerTasklet > numNodes) { - taskletNumNodes = numNodes - taskletNodesStart; - } else { - taskletNumNodes = numNodesPerTasklet; - } - - // Visit neighbors of the current frontier - mutex_id_t mutexID = MUTEX_GET(nextFrontierMutex); - for(uint32_t node = taskletNodesStart; node < taskletNodesStart + taskletNumNodes; ++node) { - uint32_t nodeTileIdx = node/64; - uint64_t currentFrontierTile = load8B(currentFrontier_m, nodeTileIdx, cache_w); // TODO: Optimize: load tile then loop over nodes in the tile - if(isSet(currentFrontierTile, node%64)) { // If the node is in the current frontier - // Visit its neighbors - uint32_t nodePtr = load4B(nodePtrs_m, node, cache_w) - nodePtrsOffset; - uint32_t nextNodePtr = load4B(nodePtrs_m, node + 1, cache_w) - nodePtrsOffset; // TODO: Optimize: might be in the same 8B as nodePtr - for(uint32_t i = nodePtr; i < nextNodePtr; ++i) { - uint32_t neighbor = load4B(neighborIdxs_m, i, cache_w); // TODO: Optimize: sequential access to neighbors can use sequential reader - uint32_t neighborTileIdx = neighbor/64; - uint64_t visitedTile = load8B(visited_m, neighborTileIdx, cache_w); - if(!isSet(visitedTile, neighbor%64)) { // Neighbor not previously visited - // Add neighbor to next frontier - mutex_lock(mutexID); // TODO: Optimize: use more locks to reduce contention - uint64_t nextFrontierTile = load8B(nextFrontier_m, neighborTileIdx, cache_w); - setBit(nextFrontierTile, neighbor%64); - store8B(nextFrontierTile, nextFrontier_m, neighborTileIdx, cache_w); - mutex_unlock(mutexID); - } - } - } - } - - } - - return 0; +int main() +{ + + if (me() == 0) { + mem_reset(); // Reset the heap + } + // Barrier + barrier_wait(&my_barrier); + + // Load parameters + uint32_t params_m = (uint32_t) DPU_MRAM_HEAP_POINTER; + struct DPUParams *params_w = + (struct DPUParams *) + mem_alloc(ROUND_UP_TO_MULTIPLE_OF_8(sizeof(struct DPUParams))); + mram_read((__mram_ptr void const *)params_m, params_w, + ROUND_UP_TO_MULTIPLE_OF_8(sizeof(struct DPUParams))); + + // Extract parameters + uint32_t numGlobalNodes = params_w->numNodes; + uint32_t startNodeIdx = params_w->dpuStartNodeIdx; + uint32_t numNodes = params_w->dpuNumNodes; + uint32_t nodePtrsOffset = params_w->dpuNodePtrsOffset; + uint32_t level = params_w->level; + uint32_t nodePtrs_m = params_w->dpuNodePtrs_m; + uint32_t neighborIdxs_m = params_w->dpuNeighborIdxs_m; + uint32_t nodeLevel_m = params_w->dpuNodeLevel_m; + uint32_t visited_m = params_w->dpuVisited_m; + uint32_t currentFrontier_m = params_w->dpuCurrentFrontier_m; + uint32_t nextFrontier_m = params_w->dpuNextFrontier_m; + + if (numNodes > 0) { + + // Sanity check + if (me() == 0) { + if (numGlobalNodes % 64 != 0) { + //PRINT_ERROR("The number of nodes in the graph is not a multiple of 64!"); + } + if (startNodeIdx % 64 != 0 || numNodes % 64 != 0) { + //PRINT_ERROR("The number of nodes assigned to the DPU is not aligned to or a multiple of 64!"); + } + } + // Allocate WRAM cache for each tasklet to use throughout + uint64_t *cache_w = mem_alloc(sizeof(uint64_t)); + + // Update current frontier and visited list based on the next frontier from the previous iteration + for (uint32_t nodeTileIdx = me(); + nodeTileIdx < numGlobalNodes / 64; + nodeTileIdx += NR_TASKLETS) { + + // Get the next frontier tile from MRAM + uint64_t nextFrontierTile = + load8B(nextFrontier_m, nodeTileIdx, cache_w); + + // Process next frontier tile if it is not empty + if (nextFrontierTile) { + + // Mark everything that was previously added to the next frontier as visited + uint64_t visitedTile = + load8B(visited_m, nodeTileIdx, cache_w); + visitedTile |= nextFrontierTile; + store8B(visitedTile, visited_m, nodeTileIdx, + cache_w); + + // Clear the next frontier + store8B(0, nextFrontier_m, nodeTileIdx, + cache_w); + + } + // Extract the current frontier from the previous next frontier and update node levels + uint32_t startTileIdx = startNodeIdx / 64; + uint32_t numTiles = numNodes / 64; + if (startTileIdx <= nodeTileIdx + && nodeTileIdx < startTileIdx + numTiles) { + + // Update current frontier + store8B(nextFrontierTile, currentFrontier_m, + nodeTileIdx - startTileIdx, cache_w); + + // Update node levels + if (nextFrontierTile) { + for (uint32_t node = nodeTileIdx * 64; + node < (nodeTileIdx + 1) * 64; + ++node) { + if (isSet + (nextFrontierTile, + node % 64)) { + store4B(level, nodeLevel_m, node - startNodeIdx, cache_w); // No false sharing so no need for locks + } + } + } + } + + } + + // Wait until all tasklets have updated the current frontier + barrier_wait(&bfsBarrier); + + // Identify tasklet's nodes + uint32_t numNodesPerTasklet = + (numNodes + NR_TASKLETS - 1) / NR_TASKLETS; + uint32_t taskletNodesStart = me() * numNodesPerTasklet; + uint32_t taskletNumNodes; + if (taskletNodesStart > numNodes) { + taskletNumNodes = 0; + } else if (taskletNodesStart + numNodesPerTasklet > numNodes) { + taskletNumNodes = numNodes - taskletNodesStart; + } else { + taskletNumNodes = numNodesPerTasklet; + } + + // Visit neighbors of the current frontier + mutex_id_t mutexID = MUTEX_GET(nextFrontierMutex); + for (uint32_t node = taskletNodesStart; + node < taskletNodesStart + taskletNumNodes; ++node) { + uint32_t nodeTileIdx = node / 64; + uint64_t currentFrontierTile = load8B(currentFrontier_m, nodeTileIdx, cache_w); // TODO: Optimize: load tile then loop over nodes in the tile + if (isSet(currentFrontierTile, node % 64)) { // If the node is in the current frontier + // Visit its neighbors + uint32_t nodePtr = + load4B(nodePtrs_m, node, + cache_w) - nodePtrsOffset; + uint32_t nextNodePtr = load4B(nodePtrs_m, node + 1, cache_w) - nodePtrsOffset; // TODO: Optimize: might be in the same 8B as nodePtr + for (uint32_t i = nodePtr; i < nextNodePtr; ++i) { + uint32_t neighbor = load4B(neighborIdxs_m, i, cache_w); // TODO: Optimize: sequential access to neighbors can use sequential reader + uint32_t neighborTileIdx = + neighbor / 64; + uint64_t visitedTile = + load8B(visited_m, neighborTileIdx, + cache_w); + if (!isSet(visitedTile, neighbor % 64)) { // Neighbor not previously visited + // Add neighbor to next frontier + mutex_lock(mutexID); // TODO: Optimize: use more locks to reduce contention + uint64_t nextFrontierTile = + load8B(nextFrontier_m, + neighborTileIdx, + cache_w); + setBit(nextFrontierTile, + neighbor % 64); + store8B(nextFrontierTile, + nextFrontier_m, + neighborTileIdx, + cache_w); + mutex_unlock(mutexID); + } + } + } + } + + } + + return 0; } |