From d3ad69999ddabd7540ab96a92f56300803f608f5 Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Tue, 24 Apr 2018 10:46:32 +0200 Subject: PTA: Use parameter dictionary internally --- lib/automata.py | 76 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 22 deletions(-) (limited to 'lib') diff --git a/lib/automata.py b/lib/automata.py index c61b065..0948cee 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -1,10 +1,23 @@ +from dfatool import AnalyticFunction + +def _parse_function(input_function): + if type('input_function') == 'str': + raise NotImplemented + if type('input_function') == 'function': + return 'raise ValueError', input_function + raise ValueError('Function description must be provided as string or function') + +def _dict_to_list(input_dict): + return [input_dict[x] for x in sorted(input_dict.keys())] + class Transition: def __init__(self, orig_state, dest_state, name, energy = 0, energy_function = None, duration = 0, duration_function = None, timeout = 0, timeout_function = None, is_interrupt = False, - arguments = [], param_update_function = None): + arguments = [], param_update_function = None, + arg_to_param_map = None): self.name = name self.origin = orig_state self.destination = dest_state @@ -17,26 +30,31 @@ class Transition: self.is_interrupt = is_interrupt self.arguments = arguments.copy() self.param_update_function = param_update_function + self.arg_to_param_map = arg_to_param_map - def get_duration(self, parameters = [], args = []): + def get_duration(self, param_dict = {}, args = []): if self.duration_function: - return self.duration_function(parameters, args) + return self.duration_function.eval(_dict_to_list(param_dict), args) return self.duration - def get_energy(self, parameters = [], args = []): + def get_energy(self, param_dict = {}, args = []): if self.energy_function: - return self.energy_function(parameters, args) + return self.energy_function.eval(_dict_to_list(param_dict), args) return self.energy - def get_timeout(self, parameters = []): + def get_timeout(self, param_dict = {}): if self.timeout_function: - return self.timeout_function(parameters) + return self.timeout_function.eval(_dict_to_list(param_dict)) return self.timeout - def get_params_after_transition(self, parameters, args = []): + def get_params_after_transition(self, param_dict, args = []): if self.param_update_function: - return self.param_update_function(parameters, args) - return parameters + return self.param_update_function(param_dict, args) + if self.arg_to_param_map: + ret = {} + #for arg_index in range(self.arg_to_param_map): + # if self.arg_to_param_map[arg_index] + return param_dict class State: def __init__(self, name, power = 0, power_function = None): @@ -45,12 +63,19 @@ class State: self.power_function = power_function self.outgoing_transitions = {} + """@classmethod + def from_json(cls, serialized_state): + if 'power' in serialized_state: + cls.power = serialized_state['power']['static'] + if 'function' in serialized_state: + cls.power_function = """ + def add_outgoing_transition(self, new_transition): self.outgoing_transitions[new_transition.name] = new_transition - def get_energy(self, duration, parameters = []): + def get_energy(self, duration, param_dict = {}): if self.power_function: - return self.power_function(parameters) * duration + return self.power_function.eval(_dict_to_list(param_dict)) * duration return self.power * duration def get_transition(self, transition_name): @@ -92,11 +117,18 @@ class PTA: self.states['UNINITIALIZED'] = State('UNINITIALIZED') def add_state(self, state_name, **kwargs): + if 'power_function' in kwargs: + kwargs['power_function'] = AnalyticFunction(kwargs['power_function'], + self.parameters, 0) self.states[state_name] = State(state_name, **kwargs) def add_transition(self, orig_state, dest_state, function_name, **kwargs): orig_state = self.states[orig_state] dest_state = self.states[dest_state] + for key in ('duration_function', 'energy_function', 'timeout_function'): + if key in kwargs: + kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0) + new_transition = Transition(orig_state, dest_state, function_name, **kwargs) self.transitions.append(new_transition) orig_state.add_outgoing_transition(new_transition) @@ -108,26 +140,26 @@ class PTA: total_duration = 0. total_energy = 0. state = self.states[orig_state] - parameters = self.initial_param_values + param_dict = dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))]) for function in trace: function_name = function[0] function_args = function[1 : ] if function_name == 'sleep': duration = function_args[0] - total_energy += state.get_energy(duration, parameters) + total_energy += state.get_energy(duration, param_dict) total_duration += duration else: transition = state.get_transition(function_name) - total_duration += transition.get_duration(parameters, function_args) - total_energy += transition.get_energy(parameters, function_args) - parameters = transition.get_params_after_transition(parameters, function_args) + total_duration += transition.get_duration(param_dict, function_args) + total_energy += transition.get_energy(param_dict, function_args) + param_dict = transition.get_params_after_transition(param_dict, function_args) state = transition.destination while (state.has_interrupt_transitions()): - transition = state.get_next_interrupt(parameters) - duration = transition.get_timeout(parameters) + transition = state.get_next_interrupt(param_dict) + duration = transition.get_timeout(param_dict) total_duration += duration - total_energy += state.get_energy(duration, parameters) - parameters = transition.get_params_after_transition(parameters) + total_energy += state.get_energy(duration, param_dict) + param_dict = transition.get_params_after_transition(param_dict) state = transition.destination - return total_energy, total_duration, state, parameters + return total_energy, total_duration, state, param_dict -- cgit v1.2.3