-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEMalgorithm.py
More file actions
73 lines (64 loc) · 2.37 KB
/
Copy pathEMalgorithm.py
File metadata and controls
73 lines (64 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""
============================================================
EMalgorithm:
This algorithm is an EM Algorithm for Confounded Heterogenous Data
y \sim X\beta +Zu+\epsilon
u \sim N(0, I \sigma_u^2)
Y \sim N(X\beta, ZZ^T \epsilon_\sigma^2 + I \sigma_u^2)
============================================================
"""
import numpy as np
import math
class EM:
# y X Z is the observed data
# maxItr is the maximum iteration steps
# initialization
def __init__(self,maxItr=100):
self.maxItr=int(maxItr)
def fit(self,y=None,X=None,Z=None):
self.y = y
self.X = X
self.Z = Z
self.K = Z.dot(Z.T)
self.n,self.p = X.shape
self.q = Z.shape[1]
self.y = self.y.reshape(self.n,1)
self.beta = np.zeros((self.p, 1))
self.sigma_u = 0.1
self.sigma_epsilon = 0.1
self.beta_history=[]
self.loglikelihood_history=[]
self.sigma_epsilon_history=[]
for _ in xrange(0, self.maxItr):
u,utu = self.Estep()
self.Mstep(u,utu)
self.sigma_epsilon_history.append(self.sigma_epsilon)
self.beta_history.append(self.beta)
self.loglikelihood_history.append(self.get_loglikelihood(u,utu))
def Estep(self):
#print("_____EStep______")
Omega_inverse = np.linalg.pinv(self.sigma_u**2*self.K
+self.sigma_epsilon**2*np.eye(self.n) )
u = self.sigma_u**2*self.Z.T.dot(Omega_inverse).dot(
self.y-self.X.dot(self.beta))
Tr = self.sigma_u**2*np.eye(self.q)-self.sigma_u**4\
*self.Z.T.dot(Omega_inverse).dot(self.Z)
Tr = np.trace(Tr)
norm = self.sigma_u**2*self.Z.T.dot(Omega_inverse).dot(
self.y-self.X.dot(self.beta))
norm = np.linalg.norm(norm)**2
utu = Tr+norm
return u,utu
def Mstep(self, u, utu):
#print("_____MStep______")
self.sigma_u = math.sqrt(utu/self.q)
temp = self.y - self.X.dot(self.beta) - self.Z.dot(u)
self.sigma_epsilon = math.sqrt((temp.T.dot(temp))/self.n)
self.beta = np.linalg.pinv(self.X.T.dot(
self.X)).dot(self.X.T).dot(self.y-self.Z.dot(u))
def get_loglikelihood(self, u, utu):
temp = self.y-self.X.dot(self.beta)-self.Z.dot(u)
LE = -self.n*math.log(self.sigma_epsilon)-self.q*math.log(self.sigma_u)\
-temp.T.dot(temp)/(2*self.sigma_u**2)\
-utu/(2*self.sigma_u**2)
return float(LE)