diff options
-rw-r--r-- | lib/lennart/DataProcessor.py | 136 | ||||
-rw-r--r-- | lib/pelt.py | 24 |
2 files changed, 88 insertions, 72 deletions
diff --git a/lib/lennart/DataProcessor.py b/lib/lennart/DataProcessor.py index ec17ca8..2554dba 100644 --- a/lib/lennart/DataProcessor.py +++ b/lib/lennart/DataProcessor.py @@ -2,6 +2,7 @@ import numpy as np import logging import os +import scipy from bisect import bisect_left, bisect_right logger = logging.getLogger(__name__) @@ -239,85 +240,84 @@ class DataProcessor: transition_start_candidate_weights.append( list( map( - lambda k: (timestamps[k], candidate_weight[k]), + lambda k: ( + timestamps[k], + timestamps[k] - expected_start_ts, + candidate_weight[k], + ), candidate_weight.keys(), ) ) ) - """ - # drift between expected and actual / estimated start timestamps at the previous transition. - # For the first transition, the "previous transition" is the led sync pulse, which has already - # been adjusted, so we have a guaranteed drift of 0. - drift = 0 - for i, expected_start_ts in enumerate(expected_transition_start_timestamps): - # assumption: after adjusting for the previous drift, the actual start timestamp is ± 1 ms away. - expected_start_ts += drift - """ - candidates = sorted( - map(lambda x: x[0], transition_start_candidate_weights[i]) - ) - - expected_start_ts += drift - expected_end_ts = sync_timestamps[i * 2 + 1] + drift + # Algorithm: Obtain the shortest path in a layered graph made up from + # transition candidates. Each node represents a transition candidate timestamp, and each layer represents a transition. + # Each node in layer i contains a directed edge to each node in layer i+1. + # The edge weight is the drift delta between the two nodes. So, if, + # node X (transition i, candidate a) has a drift of 5, and node Y + # (transition i+1, candidate b) has a drift of -2, the weight is 7. + # The first and last layer of the graph consists of a single node + # with a drift of 0, representing the start / end synchronization pulse, respectively. + + prev_nodes = [0] + prev_drifts = [0] + edge_srcs = list() + edge_dsts = list() + csr_weights = list() + node_drifts = list() + for candidates in transition_start_candidate_weights: + new_nodes = list() + new_drifts = list() + i_offset = prev_nodes[-1] + 1 + for new_node_i, (_, new_drift, _) in enumerate(candidates): + new_node = new_node_i + i_offset + new_nodes.append(new_node) + new_drifts.append(new_drift) + node_drifts.append(new_drift) + for prev_node_i, prev_node in enumerate(prev_nodes): + prev_drift = prev_drifts[prev_node_i] + + edge_srcs.append(prev_node) + edge_dsts.append(new_node) + + delta_drift = np.abs(prev_drift - new_drift) + csr_weights.append(delta_drift) + + prev_nodes = new_nodes + prev_drifts = new_drifts + + # add an end node for shortest path search + # (end node == final sync, so drift == 0) + new_node = prev_nodes[-1] + 1 + for prev_node_i, prev_node in enumerate(prev_nodes): + prev_drift = prev_drifts[prev_node_i] + edge_srcs.append(prev_node) + edge_dsts.append(new_node) + csr_weights.append(np.abs(prev_drift)) + + sm = scipy.sparse.csr_matrix( + (csr_weights, (edge_srcs, edge_dsts)), shape=(new_node + 1, new_node + 1) + ) + dm, predecessors = scipy.sparse.csgraph.shortest_path( + sm, return_predecessors=True, indices=0 + ) - # Wähle die beiden nächsten Kandidaten um den erwarteten Sync-Punkt, einmal nach links - # (kleinerer Timestamp) und einmal nach rechts (größerer Timestamp) - right_sync = bisect_left(candidates, expected_start_ts) - left_sync = right_sync - 1 + nodes = list() + pred = predecessors[-1] + while pred > 0: + nodes.append(pred) + pred = predecessors[pred] - if left_sync > 0: - left_diff = expected_start_ts - candidates[left_sync] - else: - left_diff = None + # first and graph nodes are not included in "nodes" as they represent + # the start/stop sync pulse (and not a transition with sync candidates) - if right_sync < len(candidates): - right_diff = candidates[right_sync] - expected_start_ts - else: - right_diff = None - - if left_diff is None and right_diff is None: - # compensated_timestamps.append(None) - # compensated_timestamps.append(None) - compensated_timestamps.append(expected_start_ts) - compensated_timestamps.append(expected_end_ts) - continue - - if right_diff is None and left_diff < 5e-4: - print(expected_start_ts, drift, -left_diff) - compensated_timestamps.append(expected_start_ts - left_diff) - compensated_timestamps.append(expected_end_ts - left_diff) - drift -= left_diff - continue - - if left_diff is None and right_diff < 5e-4: - print(expected_start_ts, drift, right_diff) - compensated_timestamps.append(expected_start_ts + right_diff) - compensated_timestamps.append(expected_end_ts + right_diff) - drift += right_diff - continue - - if left_diff is not None and right_diff is not None: - if left_diff < right_diff and left_diff < 1e-3: - print(expected_start_ts, drift, -left_diff) - compensated_timestamps.append(expected_start_ts - left_diff) - compensated_timestamps.append(expected_end_ts - left_diff) - drift -= left_diff - continue - if right_diff < left_diff and right_diff < 1e-3: - print(expected_start_ts, drift, right_diff) - compensated_timestamps.append(expected_start_ts + right_diff) - compensated_timestamps.append(expected_end_ts + right_diff) - drift += right_diff - continue - - # compensated_timestamps.append(None) - # compensated_timestamps.append(None) + for i, node in enumerate(reversed(nodes)): + drift = node_drifts[node] + expected_start_ts = sync_timestamps[i * 2] + drift + expected_end_ts = sync_timestamps[i * 2 + 1] + drift compensated_timestamps.append(expected_start_ts) compensated_timestamps.append(expected_end_ts) - # TODO calculate drift for "None" timestamps based on the previous and next known drift value - if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"): import json from dfatool.utils import NpEncoder diff --git a/lib/pelt.py b/lib/pelt.py index 58bc9c1..bd79c9b 100644 --- a/lib/pelt.py +++ b/lib/pelt.py @@ -34,6 +34,7 @@ def PELT_get_raw_states(num_measurement, algo, penalty): class PELT: def __init__(self, **kwargs): + self.algo = "pelt" self.model = "l1" self.jump = 1 self.min_dist = 10 @@ -45,6 +46,7 @@ class PELT: self.__dict__.update(kwargs) if os.getenv("DFATOOL_PELT_MODEL"): + # https://centre-borelli.github.io/ruptures-docs/user-guide/costs/costl1/ self.model = os.getenv("DFATOOL_PELT_MODEL") if os.getenv("DFATOOL_PELT_JUMP"): self.jump = int(os.getenv("DFATOOL_PELT_JUMP")) @@ -73,7 +75,7 @@ class PELT: normed_signal[i] = normed_signal[i] * scaler return normed_signal - def get_penalty_and_changepoints(self, signal, penalty=None): + def get_penalty_and_changepoints(self, signal, penalty=None, num_changepoints=None): # imported here as ruptures is only used for changepoint detection. # This way, dfatool can be used without having ruptures installed as # long as --pelt isn't active. @@ -84,9 +86,17 @@ class PELT: else: self.jump = 1 - algo = ruptures.Pelt( - model=self.model, jump=self.jump, min_size=self.min_dist - ).fit(self.norm_signal(signal)) + if self.algo == "dynp": + # https://centre-borelli.github.io/ruptures-docs/user-guide/detection/dynp/ + algo = ruptures.Dynp( + model=self.model, jump=self.jump, min_size=self.min_dist + ) + else: + # https://centre-borelli.github.io/ruptures-docs/user-guide/detection/pelt/ + algo = ruptures.Pelt( + model=self.model, jump=self.jump, min_size=self.min_dist + ) + algo = algo.fit(self.norm_signal(signal)) if penalty is not None: changepoints = algo.predict(pen=penalty) @@ -94,6 +104,12 @@ class PELT: changepoints.pop() return penalty, changepoints + if self.algo == "dynp" and num_changepoints is not None: + changepoints = algo.predict(pen=penalty) + if len(changepoints) and changepoints[-1] == len(signal): + changepoints.pop() + return None, changepoints + queue = list() for i in range(0, 100): queue.append((algo, i)) |