summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/lennart/DataProcessor.py136
-rw-r--r--lib/pelt.py24
2 files changed, 88 insertions, 72 deletions
diff --git a/lib/lennart/DataProcessor.py b/lib/lennart/DataProcessor.py
index ec17ca8..2554dba 100644
--- a/lib/lennart/DataProcessor.py
+++ b/lib/lennart/DataProcessor.py
@@ -2,6 +2,7 @@
import numpy as np
import logging
import os
+import scipy
from bisect import bisect_left, bisect_right
logger = logging.getLogger(__name__)
@@ -239,85 +240,84 @@ class DataProcessor:
transition_start_candidate_weights.append(
list(
map(
- lambda k: (timestamps[k], candidate_weight[k]),
+ lambda k: (
+ timestamps[k],
+ timestamps[k] - expected_start_ts,
+ candidate_weight[k],
+ ),
candidate_weight.keys(),
)
)
)
- """
- # drift between expected and actual / estimated start timestamps at the previous transition.
- # For the first transition, the "previous transition" is the led sync pulse, which has already
- # been adjusted, so we have a guaranteed drift of 0.
- drift = 0
- for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
- # assumption: after adjusting for the previous drift, the actual start timestamp is ± 1 ms away.
- expected_start_ts += drift
- """
- candidates = sorted(
- map(lambda x: x[0], transition_start_candidate_weights[i])
- )
-
- expected_start_ts += drift
- expected_end_ts = sync_timestamps[i * 2 + 1] + drift
+ # Algorithm: Obtain the shortest path in a layered graph made up from
+ # transition candidates. Each node represents a transition candidate timestamp, and each layer represents a transition.
+ # Each node in layer i contains a directed edge to each node in layer i+1.
+ # The edge weight is the drift delta between the two nodes. So, if,
+ # node X (transition i, candidate a) has a drift of 5, and node Y
+ # (transition i+1, candidate b) has a drift of -2, the weight is 7.
+ # The first and last layer of the graph consists of a single node
+ # with a drift of 0, representing the start / end synchronization pulse, respectively.
+
+ prev_nodes = [0]
+ prev_drifts = [0]
+ edge_srcs = list()
+ edge_dsts = list()
+ csr_weights = list()
+ node_drifts = list()
+ for candidates in transition_start_candidate_weights:
+ new_nodes = list()
+ new_drifts = list()
+ i_offset = prev_nodes[-1] + 1
+ for new_node_i, (_, new_drift, _) in enumerate(candidates):
+ new_node = new_node_i + i_offset
+ new_nodes.append(new_node)
+ new_drifts.append(new_drift)
+ node_drifts.append(new_drift)
+ for prev_node_i, prev_node in enumerate(prev_nodes):
+ prev_drift = prev_drifts[prev_node_i]
+
+ edge_srcs.append(prev_node)
+ edge_dsts.append(new_node)
+
+ delta_drift = np.abs(prev_drift - new_drift)
+ csr_weights.append(delta_drift)
+
+ prev_nodes = new_nodes
+ prev_drifts = new_drifts
+
+ # add an end node for shortest path search
+ # (end node == final sync, so drift == 0)
+ new_node = prev_nodes[-1] + 1
+ for prev_node_i, prev_node in enumerate(prev_nodes):
+ prev_drift = prev_drifts[prev_node_i]
+ edge_srcs.append(prev_node)
+ edge_dsts.append(new_node)
+ csr_weights.append(np.abs(prev_drift))
+
+ sm = scipy.sparse.csr_matrix(
+ (csr_weights, (edge_srcs, edge_dsts)), shape=(new_node + 1, new_node + 1)
+ )
+ dm, predecessors = scipy.sparse.csgraph.shortest_path(
+ sm, return_predecessors=True, indices=0
+ )
- # Wähle die beiden nächsten Kandidaten um den erwarteten Sync-Punkt, einmal nach links
- # (kleinerer Timestamp) und einmal nach rechts (größerer Timestamp)
- right_sync = bisect_left(candidates, expected_start_ts)
- left_sync = right_sync - 1
+ nodes = list()
+ pred = predecessors[-1]
+ while pred > 0:
+ nodes.append(pred)
+ pred = predecessors[pred]
- if left_sync > 0:
- left_diff = expected_start_ts - candidates[left_sync]
- else:
- left_diff = None
+ # first and graph nodes are not included in "nodes" as they represent
+ # the start/stop sync pulse (and not a transition with sync candidates)
- if right_sync < len(candidates):
- right_diff = candidates[right_sync] - expected_start_ts
- else:
- right_diff = None
-
- if left_diff is None and right_diff is None:
- # compensated_timestamps.append(None)
- # compensated_timestamps.append(None)
- compensated_timestamps.append(expected_start_ts)
- compensated_timestamps.append(expected_end_ts)
- continue
-
- if right_diff is None and left_diff < 5e-4:
- print(expected_start_ts, drift, -left_diff)
- compensated_timestamps.append(expected_start_ts - left_diff)
- compensated_timestamps.append(expected_end_ts - left_diff)
- drift -= left_diff
- continue
-
- if left_diff is None and right_diff < 5e-4:
- print(expected_start_ts, drift, right_diff)
- compensated_timestamps.append(expected_start_ts + right_diff)
- compensated_timestamps.append(expected_end_ts + right_diff)
- drift += right_diff
- continue
-
- if left_diff is not None and right_diff is not None:
- if left_diff < right_diff and left_diff < 1e-3:
- print(expected_start_ts, drift, -left_diff)
- compensated_timestamps.append(expected_start_ts - left_diff)
- compensated_timestamps.append(expected_end_ts - left_diff)
- drift -= left_diff
- continue
- if right_diff < left_diff and right_diff < 1e-3:
- print(expected_start_ts, drift, right_diff)
- compensated_timestamps.append(expected_start_ts + right_diff)
- compensated_timestamps.append(expected_end_ts + right_diff)
- drift += right_diff
- continue
-
- # compensated_timestamps.append(None)
- # compensated_timestamps.append(None)
+ for i, node in enumerate(reversed(nodes)):
+ drift = node_drifts[node]
+ expected_start_ts = sync_timestamps[i * 2] + drift
+ expected_end_ts = sync_timestamps[i * 2 + 1] + drift
compensated_timestamps.append(expected_start_ts)
compensated_timestamps.append(expected_end_ts)
- # TODO calculate drift for "None" timestamps based on the previous and next known drift value
-
if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"):
import json
from dfatool.utils import NpEncoder
diff --git a/lib/pelt.py b/lib/pelt.py
index 58bc9c1..bd79c9b 100644
--- a/lib/pelt.py
+++ b/lib/pelt.py
@@ -34,6 +34,7 @@ def PELT_get_raw_states(num_measurement, algo, penalty):
class PELT:
def __init__(self, **kwargs):
+ self.algo = "pelt"
self.model = "l1"
self.jump = 1
self.min_dist = 10
@@ -45,6 +46,7 @@ class PELT:
self.__dict__.update(kwargs)
if os.getenv("DFATOOL_PELT_MODEL"):
+ # https://centre-borelli.github.io/ruptures-docs/user-guide/costs/costl1/
self.model = os.getenv("DFATOOL_PELT_MODEL")
if os.getenv("DFATOOL_PELT_JUMP"):
self.jump = int(os.getenv("DFATOOL_PELT_JUMP"))
@@ -73,7 +75,7 @@ class PELT:
normed_signal[i] = normed_signal[i] * scaler
return normed_signal
- def get_penalty_and_changepoints(self, signal, penalty=None):
+ def get_penalty_and_changepoints(self, signal, penalty=None, num_changepoints=None):
# imported here as ruptures is only used for changepoint detection.
# This way, dfatool can be used without having ruptures installed as
# long as --pelt isn't active.
@@ -84,9 +86,17 @@ class PELT:
else:
self.jump = 1
- algo = ruptures.Pelt(
- model=self.model, jump=self.jump, min_size=self.min_dist
- ).fit(self.norm_signal(signal))
+ if self.algo == "dynp":
+ # https://centre-borelli.github.io/ruptures-docs/user-guide/detection/dynp/
+ algo = ruptures.Dynp(
+ model=self.model, jump=self.jump, min_size=self.min_dist
+ )
+ else:
+ # https://centre-borelli.github.io/ruptures-docs/user-guide/detection/pelt/
+ algo = ruptures.Pelt(
+ model=self.model, jump=self.jump, min_size=self.min_dist
+ )
+ algo = algo.fit(self.norm_signal(signal))
if penalty is not None:
changepoints = algo.predict(pen=penalty)
@@ -94,6 +104,12 @@ class PELT:
changepoints.pop()
return penalty, changepoints
+ if self.algo == "dynp" and num_changepoints is not None:
+ changepoints = algo.predict(pen=penalty)
+ if len(changepoints) and changepoints[-1] == len(signal):
+ changepoints.pop()
+ return None, changepoints
+
queue = list()
for i in range(0, 100):
queue.append((algo, i))