summaryrefslogtreecommitdiff
path: root/BFS/dpu
diff options
context:
space:
mode:
authorJuan Gomez Luna <juan.gomez@safari.ethz.ch>2021-06-16 19:46:05 +0200
committerJuan Gomez Luna <juan.gomez@safari.ethz.ch>2021-06-16 19:46:05 +0200
commit3de4b495fb176eba9a0eb517a4ce05903cb67acb (patch)
treefc6776a94549d2d4039898f183dbbeb2ce013ba9 /BFS/dpu
parentef5c3688c486b80a56d3c1cded25f2b2387f2668 (diff)
PrIM -- first commit
Diffstat (limited to 'BFS/dpu')
-rw-r--r--BFS/dpu/dpu-utils.h44
-rw-r--r--BFS/dpu/task.c146
2 files changed, 190 insertions, 0 deletions
diff --git a/BFS/dpu/dpu-utils.h b/BFS/dpu/dpu-utils.h
new file mode 100644
index 0000000..b02c073
--- /dev/null
+++ b/BFS/dpu/dpu-utils.h
@@ -0,0 +1,44 @@
+
+#ifndef _DPU_UTILS_H_
+#define _DPU_UTILS_H_
+
+#include <mram.h>
+
+#define PRINT_ERROR(fmt, ...) printf("\033[0;31mERROR:\033[0m "fmt"\n", ##__VA_ARGS__)
+
+static uint64_t load8B(uint32_t ptr_m, uint32_t idx, uint64_t* cache_w) {
+ mram_read((__mram_ptr void const*)(ptr_m + idx*sizeof(uint64_t)), cache_w, 8);
+ return cache_w[0];
+}
+
+static void store8B(uint64_t val, uint32_t ptr_m, uint32_t idx, uint64_t* cache_w) {
+ cache_w[0] = val;
+ mram_write(cache_w, (__mram_ptr void*)(ptr_m + idx*sizeof(uint64_t)), 8);
+}
+
+static uint32_t load4B(uint32_t ptr_m, uint32_t idx, uint64_t* cache_w) {
+ // Load 8B
+ uint32_t ptr_idx_m = ptr_m + idx*sizeof(uint32_t);
+ uint32_t offset = ((uint32_t)ptr_idx_m)%8;
+ uint32_t ptr_block_m = ptr_idx_m - offset;
+ mram_read((__mram_ptr void const*)ptr_block_m, cache_w, 8);
+ // Extract 4B
+ uint32_t* cache_32_w = (uint32_t*) cache_w;
+ return cache_32_w[offset/4];
+}
+
+static void store4B(uint32_t val, uint32_t ptr_m, uint32_t idx, uint64_t* cache_w) {
+ // Load 8B
+ uint32_t ptr_idx_m = ptr_m + idx*sizeof(uint32_t);
+ uint32_t offset = ((uint32_t)ptr_idx_m)%8;
+ uint32_t ptr_block_m = ptr_idx_m - offset;
+ mram_read((__mram_ptr void const*)ptr_block_m, cache_w, 8);
+ // Modify 4B
+ uint32_t* cache_32_w = (uint32_t*) cache_w;
+ cache_32_w[offset/4] = val;
+ // Write back 8B
+ mram_write(cache_w, (__mram_ptr void*)ptr_block_m, 8);
+}
+
+#endif
+
diff --git a/BFS/dpu/task.c b/BFS/dpu/task.c
new file mode 100644
index 0000000..43a2d0f
--- /dev/null
+++ b/BFS/dpu/task.c
@@ -0,0 +1,146 @@
+/*
+* BFS with multiple tasklets
+*
+*/
+#include <stdio.h>
+
+#include <alloc.h>
+#include <barrier.h>
+#include <defs.h>
+#include <mram.h>
+#include <mutex.h>
+#include <perfcounter.h>
+
+#include "dpu-utils.h"
+#include "../support/common.h"
+
+BARRIER_INIT(my_barrier, NR_TASKLETS);
+
+BARRIER_INIT(bfsBarrier, NR_TASKLETS);
+MUTEX_INIT(nextFrontierMutex);
+
+// main
+int main() {
+
+ if(me() == 0) {
+ mem_reset(); // Reset the heap
+ }
+ // Barrier
+ barrier_wait(&my_barrier);
+
+ // Load parameters
+ uint32_t params_m = (uint32_t) DPU_MRAM_HEAP_POINTER;
+ struct DPUParams* params_w = (struct DPUParams*) mem_alloc(ROUND_UP_TO_MULTIPLE_OF_8(sizeof(struct DPUParams)));
+ mram_read((__mram_ptr void const*)params_m, params_w, ROUND_UP_TO_MULTIPLE_OF_8(sizeof(struct DPUParams)));
+
+ // Extract parameters
+ uint32_t numGlobalNodes = params_w->numNodes;
+ uint32_t startNodeIdx = params_w->dpuStartNodeIdx;
+ uint32_t numNodes = params_w->dpuNumNodes;
+ uint32_t nodePtrsOffset = params_w->dpuNodePtrsOffset;
+ uint32_t level = params_w->level;
+ uint32_t nodePtrs_m = params_w->dpuNodePtrs_m;
+ uint32_t neighborIdxs_m = params_w->dpuNeighborIdxs_m;
+ uint32_t nodeLevel_m = params_w->dpuNodeLevel_m;
+ uint32_t visited_m = params_w->dpuVisited_m;
+ uint32_t currentFrontier_m = params_w->dpuCurrentFrontier_m;
+ uint32_t nextFrontier_m = params_w->dpuNextFrontier_m;
+
+ if(numNodes > 0) {
+
+ // Sanity check
+ if(me() == 0) {
+ if(numGlobalNodes%64 != 0) {
+ //PRINT_ERROR("The number of nodes in the graph is not a multiple of 64!");
+ }
+ if(startNodeIdx%64 != 0 || numNodes%64 != 0) {
+ //PRINT_ERROR("The number of nodes assigned to the DPU is not aligned to or a multiple of 64!");
+ }
+ }
+
+ // Allocate WRAM cache for each tasklet to use throughout
+ uint64_t* cache_w = mem_alloc(sizeof(uint64_t));
+
+ // Update current frontier and visited list based on the next frontier from the previous iteration
+ for(uint32_t nodeTileIdx = me(); nodeTileIdx < numGlobalNodes/64; nodeTileIdx += NR_TASKLETS) {
+
+ // Get the next frontier tile from MRAM
+ uint64_t nextFrontierTile = load8B(nextFrontier_m, nodeTileIdx, cache_w);
+
+ // Process next frontier tile if it is not empty
+ if(nextFrontierTile) {
+
+ // Mark everything that was previously added to the next frontier as visited
+ uint64_t visitedTile = load8B(visited_m, nodeTileIdx, cache_w);
+ visitedTile |= nextFrontierTile;
+ store8B(visitedTile, visited_m, nodeTileIdx, cache_w);
+
+ // Clear the next frontier
+ store8B(0, nextFrontier_m, nodeTileIdx, cache_w);
+
+ }
+
+ // Extract the current frontier from the previous next frontier and update node levels
+ uint32_t startTileIdx = startNodeIdx/64;
+ uint32_t numTiles = numNodes/64;
+ if(startTileIdx <= nodeTileIdx && nodeTileIdx < startTileIdx + numTiles) {
+
+ // Update current frontier
+ store8B(nextFrontierTile, currentFrontier_m, nodeTileIdx - startTileIdx, cache_w);
+
+ // Update node levels
+ if(nextFrontierTile) {
+ for(uint32_t node = nodeTileIdx*64; node < (nodeTileIdx + 1)*64; ++node) {
+ if(isSet(nextFrontierTile, node%64)) {
+ store4B(level, nodeLevel_m, node - startNodeIdx, cache_w); // No false sharing so no need for locks
+ }
+ }
+ }
+ }
+
+ }
+
+ // Wait until all tasklets have updated the current frontier
+ barrier_wait(&bfsBarrier);
+
+ // Identify tasklet's nodes
+ uint32_t numNodesPerTasklet = (numNodes + NR_TASKLETS - 1)/NR_TASKLETS;
+ uint32_t taskletNodesStart = me()*numNodesPerTasklet;
+ uint32_t taskletNumNodes;
+ if(taskletNodesStart > numNodes) {
+ taskletNumNodes = 0;
+ } else if(taskletNodesStart + numNodesPerTasklet > numNodes) {
+ taskletNumNodes = numNodes - taskletNodesStart;
+ } else {
+ taskletNumNodes = numNodesPerTasklet;
+ }
+
+ // Visit neighbors of the current frontier
+ mutex_id_t mutexID = MUTEX_GET(nextFrontierMutex);
+ for(uint32_t node = taskletNodesStart; node < taskletNodesStart + taskletNumNodes; ++node) {
+ uint32_t nodeTileIdx = node/64;
+ uint64_t currentFrontierTile = load8B(currentFrontier_m, nodeTileIdx, cache_w); // TODO: Optimize: load tile then loop over nodes in the tile
+ if(isSet(currentFrontierTile, node%64)) { // If the node is in the current frontier
+ // Visit its neighbors
+ uint32_t nodePtr = load4B(nodePtrs_m, node, cache_w) - nodePtrsOffset;
+ uint32_t nextNodePtr = load4B(nodePtrs_m, node + 1, cache_w) - nodePtrsOffset; // TODO: Optimize: might be in the same 8B as nodePtr
+ for(uint32_t i = nodePtr; i < nextNodePtr; ++i) {
+ uint32_t neighbor = load4B(neighborIdxs_m, i, cache_w); // TODO: Optimize: sequential access to neighbors can use sequential reader
+ uint32_t neighborTileIdx = neighbor/64;
+ uint64_t visitedTile = load8B(visited_m, neighborTileIdx, cache_w);
+ if(!isSet(visitedTile, neighbor%64)) { // Neighbor not previously visited
+ // Add neighbor to next frontier
+ mutex_lock(mutexID); // TODO: Optimize: use more locks to reduce contention
+ uint64_t nextFrontierTile = load8B(nextFrontier_m, neighborTileIdx, cache_w);
+ setBit(nextFrontierTile, neighbor%64);
+ store8B(nextFrontierTile, nextFrontier_m, neighborTileIdx, cache_w);
+ mutex_unlock(mutexID);
+ }
+ }
+ }
+ }
+
+ }
+
+ return 0;
+}