adctoolbox.fundamentals.fit_sine_4param 源代码

"""
Least Squares Sine Wave Fitting (IEEE Std 1057/1241).

Supports 3-parameter (frequency-fixed) and 4-parameter (frequency-optimized) fitting modes.
"""

import warnings
import numpy as np

[文档] def fit_sine_4param(data, frequency_estimate=None, max_iterations=1, tolerance=1e-9, verbose=0): """ Fit sine wave: y = A*cos(wt) + B*sin(wt) + C. Args: data: Input signal (1D or 2D array). frequency_estimate: Initial normalized frequency (0 to 0.5). If None, estimated via FFT. max_iterations: Iterations for frequency refinement. tolerance: Convergence threshold for frequency updates. verbose: Verbosity level (0=silent, >0=print iteration info). Returns: Dictionary with fitted parameters: - fitted_signal: Reconstructed sine wave - residuals: Data - fitted_signal - frequency: Normalized frequency (0 to 0.5) - amplitude: sqrt(A² + B²) - phase: atan2(-B, A) in radians - dc_offset: DC component - rmse: Root mean square error For 2D input, all values (except fitted_signal, residuals) are 1D arrays. """ data = np.asarray(data) if data.ndim == 1: return _fit_core(data, frequency_estimate, max_iterations, tolerance, verbose) if data.ndim == 2: results = [_fit_core(ch, frequency_estimate, max_iterations, tolerance, verbose) for ch in data.T] return _merge_results(results) raise ValueError(f"Input must be 1D or 2D, got {data.ndim}D.")
def _fit_core(y, freq_init, max_iter, tol, verbose=0): """Fit sine wave to 1D signal using least squares.""" n = len(y) t = np.arange(n) freq = freq_init if freq_init is not None else _estimate_frequency_fft(y) a = b = c = 0.0 converged = False for i in range(max_iter + 1): omega = 2 * np.pi * freq cos_vec = np.cos(omega * t) sin_vec = np.sin(omega * t) # Build design matrix: 3-param on iteration 0, 4-param on later iterations if i == 0: design_matrix = np.column_stack((cos_vec, sin_vec, np.ones(n))) else: freq_corr = t * (-a * sin_vec + b * cos_vec) design_matrix = np.column_stack((cos_vec, sin_vec, np.ones(n), freq_corr)) coeffs = np.linalg.lstsq(design_matrix, y, rcond=None)[0] a, b, c = coeffs[:3] if len(coeffs) > 3: delta_freq = coeffs[3] / (2 * np.pi) freq += delta_freq # Clamp frequency to valid range freq = np.clip(freq, 1e-10, 0.5 - 1e-10) if verbose > 0: print(f"Freq iterating ({i}): freq = {freq}, " f"delta_freq = {delta_freq}") if abs(delta_freq) < tol: converged = True break # Warn if max_iterations > 0 and loop did not converge if max_iter > 0 and not converged: warnings.warn( f"fit_sine_4param did not converge in {max_iter} iterations.", RuntimeWarning, stacklevel=3, ) omega = 2 * np.pi * freq fitted_sig = a * np.cos(omega * t) + b * np.sin(omega * t) + c residuals = y - fitted_sig return { 'fitted_signal': fitted_sig, 'residuals': residuals, 'frequency': freq, 'amplitude': np.sqrt(a**2 + b**2), 'phase': np.arctan2(-b, a), 'dc_offset': c, 'rmse': np.sqrt(np.mean(residuals**2)) } def _merge_results(results_list): """Merge list of result dicts into single dict with stacked arrays.""" merged = {} for key in results_list[0].keys(): values = [r[key] for r in results_list] if np.ndim(values[0]) == 0: # Scalar merged[key] = np.array(values) else: # Array (fitted_signal, residuals) merged[key] = np.column_stack(values) return merged def _estimate_frequency_fft(y): """Estimate frequency using FFT peak with parabolic interpolation.""" n = len(y) spec = np.abs(np.fft.fft(y)) spec[0] = 0 spec = spec[:n // 2] k = np.argmax(spec) # Parabolic interpolation for sub-bin precision if 0 < k < len(spec) - 1: r = 1 if spec[k + 1] > spec[k - 1] else -1 delta = r * spec[k + r] / (spec[k] + spec[k + r]) k += delta return k / n