source: branches/dev-5819/Python/src/pyjmi/ukf.py @ 13461

Last change on this file since 13461 was 13461, checked in by randersson, 3 months ago

#5819 Performed 2to3 on the entire src/Python directory in order to make the code compatible. I've not reviewed all changes while doing this commit but will do while starting to run the tests. I'm not sure everything is converted correctly, for example the change in test_casadi_collocation row 1610.

File size: 23.4 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4#    Copyright (C) 2014 Modelon AB
5#
6#    This program is free software: you can redistribute it and/or modify
7#    it under the terms of the GNU General Public License as published by
8#    the Free Software Foundation, version 3 of the License.
9#
10#    This program is distributed in the hope that it will be useful,
11#    but WITHOUT ANY WARRANTY; without even the implied warranty of
12#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13#    GNU General Public License for more details.
14#
15#    You should have received a copy of the GNU General Public License
16#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17"""
18Unscented Kalman Filter module
19"""
20
21import numpy as N
22import scipy as S
23import types 
24from scipy.sparse import lil_matrix, linalg
25from pyfmi.common.algorithm_drivers import OptionBase, InvalidAlgorithmOptionException, UnrecognizedOptionError
26import collections
27from pyfmi.fmi import FMUException
28from assimulo.solvers.sundials import CVodeError
29import random
30
31class UKF:
32    """A class representing a Non-augmented Unscented Kalman Filter.
33   
34    The sigma point distribution is chosen according to the method described in
35    "Unscented Kalman Filter for Nonlinear Estimation" by E.A Wan and
36    R. van der Merwe (2000).
37   
38    The algorithm is the non-augmented version described in
39    "Unscented Kalman Filter using Augmented State in Presence of Additive Noise"
40    by Fuming, S. et al. (2009)
41   
42    NOTES:
43    -All random disturbances are considered additive, uncorrelated and zero-mean.
44   
45    -All states and measurements are indexed in alphabetical order internally
46    when represented in arrays.
47   
48    -Inputs must be in the form of a tuple according to FMI standard for simulation, example:
49        input_traj = N.transpose(N.vstack((t, u1, u2))) #time, input 1 and input 2 in a matrix
50        input = (['u1', 'u2'], input_traj)    #Input to predict step
51       
52    -All variables are scaled down internally with their nominal values
53   
54    Attributes:
55        model -- Observer model (FMUModel)
56        options -- UKF options containing covariances and weight parameters (UKFOptions)
57        h -- Sampling interval in seconds (float)
58        currTime -- The current time in the observer model (float)
59        x  -- Contains the current state estimates, sorted alphabetically ([ScaledVariable])
60        mes -- Contains the measured variables ([ScaledVariable])
61        P --  Estimated state covariance matrix (numpy.array)
62        P_v -- Assumed process noise covariance matrix (numpy.array)
63        P_n -- Assumed measurement noise covariance matrix (numpy.array)
64        Wm -- Weights for mean calculation (numpy.array)
65        Wc -- Weights for covariance calculation (numpy.array)
66        K -- Kalman filter gain (numpy.array)
67        xp -- The a priori predicted state estimates (numpy.array)
68        yp -- The a priori predicted measurements (numpy.array)
69        fails -- Dict containing the number of failed sigma-point simulations at each time instance ({float:int})
70    """
71   
72    def __init__(self, model, x_0, measurements, h, options):
73        """Constructor
74       
75            Arguments:
76            model -- Observer model (FMUModel)
77            x_0  -- Initial state estimate vector ({string:float})
78            measurements -- A list of the measured states ([string])
79                        h -- Sample interval (float)
80                        options -- UKF options containing initial covariances and parameters (UKFOptions)
81           
82        """
83       
84        #Assign attributes       
85        self.model = model
86        self.options = options.copy()
87        self.h = h
88        self.currTime = 0.0
89       
90        #Form sorted list of scaled state variables
91        x = []
92        for state in x_0:
93            x = x + [ScaledVariable(state, x_0[state], model.get_variable_nominal(state))]
94        self.x = sorted(x, key=ScaledVariable.get_name)
95       
96        #Form sorted list of scaled measurement variables
97        mes = []
98        for var in measurements:
99            mes = mes + [ScaledVariable(var, 0.0, model.get_variable_nominal(var))]
100        self.mes = sorted(mes, key=ScaledVariable.get_name)
101       
102        #Form the state covariance matrix with scaled covariances
103        P = N.eye(len(options['P_0']))
104        for i,state in enumerate(self.x):
105            P[i,i] = options['P_0'][state.get_name()]/state.get_nominal_value()**2
106        self.P = P
107       
108        #Form the process noise covariance matrix with scaled covariances
109        P_v = N.eye(len(options['P_v']))
110        for i,state in enumerate(self.x):
111            P_v[i,i] = options['P_v'][state.get_name()]/state.get_nominal_value()**2
112        self.P_v = P_v
113       
114        #Form the measurement noise covariance matrix with scaled covariances
115        P_n = N.eye(len(options['P_n']))
116        for i,measurement in enumerate(self.mes):
117            P_n[i,i] = options['P_n'][measurement.get_name()]/measurement.get_nominal_value()**2
118        self.P_n = P_n
119       
120        #Form Kalman gain, predicted state vector, predicted measurement vector and fails dict
121        self.K = N.zeros((len(self.x),len(self.mes)))
122        self.xp = N.zeros((len(self.x),1))
123        self.yp = N.zeros((len(self.mes),1))
124        self.fails = {}
125       
126        #Calculate and assign sigma point weights
127        [Wm, Wc] = self._calc_weights(options)                                     
128        self.Wm = Wm
129        self.Wc = Wc
130       
131        #Make sure the model is properly reset
132        self.model.reset()
133       
134    def _calc_weights(self, options):
135        """Calculate weights for sigma points.
136        Needs to be done only when changing parameters of alpha, beta and kappa
137       
138            Arguments:
139            options -- current options for the UKF (UKFOptions)
140           
141            Returns:
142            Wm -- Weights for mean calculation (numpy.array)
143            Wc -- Weights for covariance calculation (numpy.array)
144           
145        """
146       
147        L = len(options['P_0']) #Dimension of state vector
148        lambd = (options['alpha']**2) * (L + options['kappa']) - L   
149        c = L + lambd
150        Wm = (0.5 / c) * N.ones(2 * L + 1)
151        Wm[0] = lambd / c 
152        Wc = N.copy(Wm)
153        Wc[0] = Wc[0] + (1.0 - options['alpha']**2 + options['beta'])
154        return [N.copy(Wm), N.copy(Wc)]
155   
156    def update_options(self, *args, **kw):
157        """Updates the options and the sigma point weights
158       
159        """
160       
161        #Update options attribute
162        self.options.update(*args, **kw)
163       
164        #Update weights
165        [Wm, Wc] = self._calc_weights(self.options)
166        self.Wm = Wm
167        self.Wc = Wc
168       
169        #Update noise covariances
170        P_v = N.eye(len(self.options['P_v']))
171        for i,state in enumerate(self.x):
172            P_v[i,i] = self.options['P_v'][state.get_name()]/state.get_nominal_value()**2
173        self.P_v = P_v
174       
175        P_n = N.eye(len(self.options['P_n']))
176        for i,measurement in enumerate(self.mes):
177            P_n[i,i] = self.options['P_n'][measurement.get_name()]/measurement.get_nominal_value()**2
178        self.P_n = P_n
179       
180    def get_options(self):
181        """Returns a copy of the current options for the UKF.
182       
183            Returns:
184            options -- The current options of the UKF (UKFOptions)
185       
186        """
187       
188        return self.options.copy()
189       
190    def _calc_sigma(self, x, P, P_v, P_n, options):
191        """Calculates the sigma points for a state estimate vector.
192           
193            Arguments:
194            x -- Current state estimate vector ([ScaledVariable])
195            P -- Current scaled estimate of state covariance (numpy.array)
196            P_v -- Scaled process noise covariance (numpy.array)
197            P_n -- Scaled measurement noise covariance (numpy.array)
198            options -- options containing weight parameters (UKFOptions)
199           
200            Returns:
201            sigma -- the sigma matrix, whose columns corresponds to the sigma points.
202       
203       """
204       
205        #State vector dimension
206        L = len(options['P_0'])                                             
207       
208        #Represent current state estimate as an array
209        x_a = []
210        for state in x:
211            x_a = x_a + [state.get_scaled_value()]
212        x_a = N.array(x_a)                                                                           
213       
214        #Calculate the cholesky decomposition of the covariance matrix
215        P = P * (options['alpha']**2) * (L + options['kappa'])
216       
217        try:
218            P_sqrt = S.linalg.cholesky(P, lower = True)
219        except N.linalg.LinAlgError:
220            print('The covariance matrix was not positive definite:')
221            print('P = ')
222            print(P)
223            raise
224   
225        #Calculate sigma matrix
226        sigma = N.zeros((L, 2 * L + 1))
227        sigma[:,0] = x_a                      #First sigma point is the estimated augmented state vector
228        for i in range(1, L + 1):
229            sigma[:,i] = x_a + P_sqrt[:,(i-1)]
230        for i in range(L + 1, 2 * L + 1):
231            sigma[:,i] = x_a - P_sqrt[:,i - (L + 1)] 
232        return sigma
233       
234    def update(self,y):
235       
236        """Public method for updating the a priori estimate with info from the latest measurement.
237        Calls _update.
238           
239            Argument:
240            y -- Measurements (non-scaled) ({string:float})
241           
242            Returns:
243            x -- State estimate (non-scaled) ({string:float})
244       
245        """
246       
247        #Sort measurements alphabetically and extract values into scaled form
248        y = collections.OrderedDict(sorted(list(y.items()), key = lambda t: t[0]))
249        y_scaled = []
250        for measurement in self.mes:
251            y_scaled = y_scaled + [y[measurement.get_name()]/measurement.get_nominal_value()]
252        y_scaled = N.array(y_scaled)
253        y_scaled = N.reshape(y_scaled, (-1,1))
254   
255        #Retrieve updated scaled state estimate vector
256        x_scaled = self._update(y_scaled, self.xp, self.yp, self.K) 
257   
258        #Set new values in list of states, and return a dictionary with the actual values
259        i = 0
260        x_out = {}
261        for state in self.x:   
262            state.set_scaled_value(x_scaled[i,0])
263            x_out[state.get_name()] = state.get_actual_value()
264            i = i + 1
265        return x_out
266       
267    def _update(self,y, xp, yp, K):
268       
269        """Updates the a priori estimate with info from the latest measurement.
270       
271            Arguments:
272            y -- Scaled measurements (numpy.array)
273            xp -- Scaled predicted state estimation vector (numpy.array)
274            yp -- Scaled predicted measurement (numpy.array)
275            K -- Scaled Kalman filter gain (numpy.array))
276           
277            Returns:
278            x -- Updated scaled state estimation vector (numpy.array)
279       
280        """
281        x = xp + K.dot(y - yp)
282       
283        return x
284   
285    def predict(self, u=(), known_values={}):
286   
287        """Public wrapper method for predicting the state estimate at the next sample instant.
288        Calls _predict.
289       
290            Arguments:
291            u -- Input trajectory to process model (([string], numpy.array))
292            known_values -- A dict containing known state values ({string:float})
293        """
294   
295        #Calculate sigma points
296        sigma = self._calc_sigma(self.x, self.P, self.P_v, self.P_n, self.options) 
297 
298        #Do a prediction of states and measurements, and calculate Kalman filter gain
299        [xp, yp, K, P] = self._predict(sigma, self.model, self.x, u, known_values, self.P_v, self.P_n, self.currTime, self.h, self.mes, self.Wm, self.Wc)
300   
301        #Update attributes
302        self.currTime = self.currTime + self.h
303        self.xp = xp
304        self.yp = yp
305        self.K = K
306        self.P = P
307   
308    def _predict(self, sigma, model, x, u, known_values, P_v, P_n, currTime, h, measurements, Wm, Wc):
309        """"Predict the state estimate at the next sample instant.
310       
311            Arguments:
312            sigma -- the sigma matrix, whose columns corresponds to the sigma points of the
313            augmented state vector. (numpy.array)
314            model -- Observer model (FMUModel)
315            x -- The current state estimates ([ScaledVariable])
316            u -- Input trajectory to the process model (([string], numpy.array))
317            known_values -- Known state values ({string:float})
318            P_v -- Assumed process noise covariance matrix (numpy.array)
319            P_n -- Assumed measurement noise covariance matrix (numpy.array)
320            currTime -- Current time instant (float)
321            h -- Sample interval in seconds (float)
322            measurements -- Contains the measured variables ([ScaledVariable])
323            Wm -- Weights for mean calculation (numpy.array)
324            Wc -- Weights for covariance calculation (numpy.array)
325           
326                        Returns:
327            xp -- The scaled a priori predicted states (numpy.array)
328            yp -- The scaled a priori predicted measurements (numpy.array)
329            K -- The scaled Kalman filter gain (numpy.array)
330            P -- The scaled estimated state covariance matrix (numpy.array)
331           
332        """
333       
334        #Calculate sigma matrix and extract sigma points
335        L_meas = len(measurements)     
336     
337        #Produce prediction of state estimate and measurement
338        Xxp = N.zeros(sigma.shape)
339        Y = N.zeros([L_meas, sigma.shape[1]])
340       
341        #Simulate each sigma point h seconds
342        for i in range(0, sigma.shape[1]):
343       
344            #If sigma point simulation fails, try perturbing initial value and start again. Maximum 10 times,
345            #then use the result of the last simulated sigma point.
346            retry = True
347            k = 1
348                       
349            while retry:
350                retry = False
351                #Reset the observer model
352                model.reset()
353           
354                #Set the initial states as the current sigma point
355                for j, state in enumerate(x):
356                    #If the sigma point has previously failed, try perturbing the state values
357                    if k > 1:
358                        dist = N.abs(sigma[j,0] - sigma[j,i])                #Distance in this coordinate to mean point
359                        sigma[j,i] = sigma[j,i] + random.gauss(0, k*1e-3*dist) #Perturb with 0.1% of distance as std. Increase times k after each iteration.
360                    model.set(state.get_name()+'_0', sigma[j,i]*state.get_nominal_value())
361                   
362                #Set known values
363                for known in known_values:
364                    model.set(known+'_0', known_values[known])
365                   
366                #Simulate and extract result
367                opt = model.simulate_options()
368                opt['CVode_options']['atol'] = 1e-8
369                opt['CVode_options']['rtol'] = 1e-6
370                print('Simulating sigma-point '+str(i+1)+' out of '+str(sigma.shape[1])+' :')
371                try:
372                    result = model.simulate(start_time = currTime, final_time = currTime + h, options = opt, input = u)
373                except (CVodeError, ValueError, FMUException) as e:
374                    print(e)
375                    retry = True
376                    print('Failed sigma point simulation')
377                    if k == 1:
378                        if currTime in list(self.fails.keys()):
379                            self.fails[currTime] = self.fails[currTime] + 1
380                        else:
381                            self.fails[currTime] = 1
382                    if k == 10:
383                        print('Simulation failed 10 times, will use result from last sigma point instead')
384                        break
385                k = k + 1
386                       
387            #Assign result to matrix, and add process noise. Initialize anew so that all variables are updated
388            for k, state in enumerate(x):
389                Xxp[k,i] = result[state.get_name()][-1]/state.get_nominal_value()             
390     
391            #Assign measurements and add measurement noise
392            for k, meas in enumerate(measurements):
393                Y[k,i] = result[meas.get_name()][-1]/meas.get_nominal_value()
394       
395        #Compute predictions and covariances
396        xp = N.multiply(Wm, Xxp[:])                         #Multiply each point with corresponding weight
397        yp = N.multiply(Wm, Y[:])
398        xp = xp.sum(axis = 1)                               #Produce the weighted sum
399        yp = yp.sum(axis = 1) 
400        xp = N.reshape(xp, (-1,1))                          #Make them 2-D column vectors
401        yp = N.reshape(yp, (-1,1))
402       
403        Pxx = N.zeros((Xxp.shape[0],Xxp.shape[0]))          #State covariance
404        Pyy = N.zeros((Y.shape[0],Y.shape[0]))              #Measurement covariance
405        Pxy = N.zeros((Xxp.shape[0],Y.shape[0]))            #Cross-covariance
406       
407        for i in range(0,Xxp.shape[1]):
408            x_dev = N.reshape(Xxp[:,i],(-1,1)) - xp
409            y_dev = N.reshape(Y[:,i], (-1,1))- yp     
410            Pxx = Pxx + self.Wc[i] * N.outer(x_dev, x_dev)
411            Pyy = Pyy + self.Wc[i] * N.outer(y_dev, y_dev)
412            Pxy = Pxy + self.Wc[i] * N.outer(x_dev,y_dev)
413       
414        Pxx = Pxx + P_v
415        Pyy = Pyy + P_n
416     
417        #Calculate Kalman filter gain
418        if Pyy.shape[0] == 1 or Pyy.shape[1] == 1:
419            invPyy = 1 / Pyy   
420        else:
421            invPyy = N.linalg.solve(Pyy,N.eye(Pyy.shape[0]))
422       
423        K = Pxy.dot(invPyy)
424       
425        #Calculate state covariance
426        P = Pxx - K.dot(Pyy.dot(K.T))
427        return [xp, yp, K, P]
428       
429class UKFOptions(OptionBase):
430    """Class containing covariance matrices and weight parameters for the UKF.
431    The covariance matrices are considered diagonal, with the variance of each
432    state or disturbance on state/measurement represented in a dict with the
433    state/measurement name as key and variance as value.
434   
435    Extends pyfmi.common.algorithm_drivers.OptionsBase
436   
437    Attributes:
438        P_0 -- Initial state estimate covariance matrix ({string:float})
439        P_v -- Process noise covariance matrix ({string:float})
440        P_n -- Measurement noise covariance matrix ({string:float})
441        alpha -- Scaling parameter for distribution of sigma points, should be
442            between 0 and 1 (float)
443        beta -- Parameter for including prior knowledge of state variance,
444            where beta = 2 is optimal for Gaussian distributions (float)
445        kappa -- Secondary scaling parameter, used to ensure semi-positive definiteness
446            of covariance matrix. Usually set to zero (float)
447    """
448   
449    def __init__(self, *args, **kw):
450        """ Constructor
451        """
452     
453        #Set default values, and then update to user input arguments
454        defaults = {'P_0': {} , 'P_v': {}, 'P_n': {}, 'alpha': 1e-3, 'beta': 2.0, 'kappa':0.0}
455        super(UKFOptions, self).__init__(defaults)
456       
457        #Update options with user input
458        self.update(*args, **kw)
459           
460    def __setitem__(self, key, value):
461        """Set a specific option for the UKF
462       
463            Arguments:
464            key -- Specifies which parameter that should be updated (string)
465            value -- The new value that should be assigned
466                ({string:float} for covariances, float for weights)
467        """
468       
469        #Check if key is valid
470        if key not in self:
471            raise UnrecognizedOptionError(str(key)+' is not a valid option.')
472       
473        #If user put value as int, convert to float
474        if (key == 'alpha' or key == 'beta' or key == 'kappa') and type(value) == int:
475            value = float(value)
476       
477        if (key == 'P_0' or key == 'P_v' or key == 'P_n') and type(value) == dict:
478            for var in list(value.keys()):
479                if type(value[var]) == int:
480                    value[var] = float(value[var])
481                elif type(value[var]) != float and type(value[var]) != N.float64:
482                    raise InvalidAlgorithmOptionException('All values in '+str(key)+' must be floats or ints') 
483       
484        #Check for invalid option argument
485        if not (type(self[key]) == type(value)):
486            raise InvalidAlgorithmOptionException('Expected '+str(type(self[key]))+' for '+str(key)+' but got '+str(type(value)))   
487       
488        #Set the option
489        super(UKFOptions, self).__setitem__(key, value)
490   
491    def update(self, *args, **kw):
492        """Update the options for the UKF"""
493       
494        #Check for invalid options and arguments
495        keys = list(kw.keys())
496        for key in keys:
497            value = kw[key]
498            if key not in self:
499                raise UnrecognizedOptionError(str(key)+' is not a valid option.')
500            elif (key == 'alpha' or key == 'beta' or key == 'kappa') and type(value) == int:
501                value = float(value)
502            elif (key == 'P_0' or key == 'P_v' or key == 'P_n') and type(value) == dict:
503                for var in list(value.keys()):
504                    if type(value[var]) == int:
505                        value[var] = float(value[var])
506                    elif type(value[var]) != float:
507                        raise InvalidAlgorithmOptionException('All values in '+str(key)+' must be floats or ints') 
508            if not type(self[key]) == type(value):
509                raise InvalidAlgorithmOptionException('Expected '+str(type(self[key]))+' for '+str(key)+' but got '+str(type(kw[key])))   
510     
511           
512       
513        #Update the options
514        super(UKFOptions, self).update(*args, **kw)
515       
516class ScaledVariable:
517   
518    """A class representing a state or measurement variable, scaled with its nominal value
519   
520    Attributes:
521        name -- Name of variable (string)
522        nominal_value -- Nominal value of the variable (float)
523        actual_value -- Unscaled value of the variable (float)
524   
525    """
526   
527    def __init__(self, name, value, nominal_value):
528        """Constructor
529            Arguments:
530            name -- Name of variable (string)
531            value -- Unscaled value of variable (float)
532            nominal_value -- Nominal value of variable (float)
533       
534        """
535   
536        self.name = name
537        self.actual_value = float(value)
538        self.nominal_value = float(nominal_value)
539       
540    def get_name(self):
541        return self.name
542   
543    def __str__(self):
544        return '{' +self.name + ' | Value : '+str(self.actual_value)+' | Nominal : '+str(self.nominal_value)+'}\n'
545   
546    def __repr__(self):
547        return '{' +self.name + ' | Value : '+str(self.actual_value)+' | Nominal : '+str(self.nominal_value)+'}\n'
548       
549    def set_actual_value(self, actual_value):
550        self.actual_value = float(actual_value)
551       
552    def set_scaled_value(self, scaled_value):
553        self.actual_value = scaled_value * self.nominal_value
554       
555    def get_scaled_value(self):
556        return self.actual_value/self.nominal_value
557   
558    def get_actual_value(self):
559        return self.actual_value
560   
561    def get_nominal_value(self):
562        return self.nominal_value
563       
564       
Note: See TracBrowser for help on using the repository browser.