diff options
Diffstat (limited to 'SEL/dpu/task.c')
-rw-r--r-- | SEL/dpu/task.c | 123 |
1 files changed, 123 insertions, 0 deletions
diff --git a/SEL/dpu/task.c b/SEL/dpu/task.c new file mode 100644 index 0000000..c161d10 --- /dev/null +++ b/SEL/dpu/task.c @@ -0,0 +1,123 @@ +/* +* Select with multiple tasklets +* +*/ +#include <stdint.h> +#include <stdio.h> +#include <defs.h> +#include <mram.h> +#include <alloc.h> +#include <perfcounter.h> +#include <handshake.h> +#include <barrier.h> + +#include "../support/common.h" + +__host dpu_arguments_t DPU_INPUT_ARGUMENTS; +__host dpu_results_t DPU_RESULTS[NR_TASKLETS]; + +// Array for communication between adjacent tasklets +uint32_t message[NR_TASKLETS]; +uint32_t message_partial_count; + +// SEL in each tasklet +static unsigned int select(T *output, T *input){ + unsigned int pos = 0; + #pragma unroll + for(unsigned int j = 0; j < REGS; j++) { + if(!pred(input[j])) { + output[pos] = input[j]; + pos++; + } + } + return pos; +} + +// Handshake with adjacent tasklets +static unsigned int handshake_sync(unsigned int l_count, unsigned int tasklet_id){ + unsigned int p_count; + // Wait and read message + if(tasklet_id != 0){ + handshake_wait_for(tasklet_id - 1); + p_count = message[tasklet_id]; + } + else + p_count = 0; + // Write message and notify + if(tasklet_id < NR_TASKLETS - 1){ + message[tasklet_id + 1] = p_count + l_count; + handshake_notify(); + } + return p_count; +} + +// Barrier +BARRIER_INIT(my_barrier, NR_TASKLETS); + +extern int main_kernel1(void); + +int (*kernels[nr_kernels])(void) = {main_kernel1}; + +int main(void) { + // Kernel + return kernels[DPU_INPUT_ARGUMENTS.kernel](); +} + +// main_kernel1 +int main_kernel1() { + unsigned int tasklet_id = me(); +#if PRINT + printf("tasklet_id = %u\n", tasklet_id); +#endif + if (tasklet_id == 0){ // Initialize once the cycle counter + mem_reset(); // Reset the heap + } + // Barrier + barrier_wait(&my_barrier); + + dpu_results_t *result = &DPU_RESULTS[tasklet_id]; + + uint32_t input_size_dpu_bytes = DPU_INPUT_ARGUMENTS.size; + + // Address of the current processing block in MRAM + uint32_t base_tasklet = tasklet_id << BLOCK_SIZE_LOG2; + uint32_t mram_base_addr_A = (uint32_t)DPU_MRAM_HEAP_POINTER; + uint32_t mram_base_addr_B = (uint32_t)(DPU_MRAM_HEAP_POINTER + input_size_dpu_bytes); + + // Initialize a local cache to store the MRAM block + T *cache_A = (T *) mem_alloc(BLOCK_SIZE); + T *cache_B = (T *) mem_alloc(BLOCK_SIZE); + + // Initialize shared variable + if(tasklet_id == NR_TASKLETS - 1) + message_partial_count = 0; + // Barrier + barrier_wait(&my_barrier); + + for(unsigned int byte_index = base_tasklet; byte_index < input_size_dpu_bytes; byte_index += BLOCK_SIZE * NR_TASKLETS){ + + // Load cache with current MRAM block + mram_read((__mram_ptr void const*)(mram_base_addr_A + byte_index), cache_A, BLOCK_SIZE); + + // SELECT in each tasklet + uint32_t l_count = select(cache_B, cache_A); // In-place or out-of-place? + + // Sync with adjacent tasklets + uint32_t p_count = handshake_sync(l_count, tasklet_id); + + // Barrier + barrier_wait(&my_barrier); + + // Write cache to current MRAM block + mram_write(cache_B, (__mram_ptr void*)(mram_base_addr_B + (message_partial_count + p_count) * sizeof(T)), l_count * sizeof(T)); + + // Total count in this DPU + if(tasklet_id == NR_TASKLETS - 1){ + result->t_count = message_partial_count + p_count + l_count; + message_partial_count = result->t_count; + } + + } + + return 0; +} |