-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlightcache.py
More file actions
241 lines (194 loc) · 6.43 KB
/
lightcache.py
File metadata and controls
241 lines (194 loc) · 6.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import socket
import time
import re
import os
import sys
import grp
import pwd
import evhttpconn
import ev
import compressor
import settings
from caching_client import CachingClient
class Page(object):
def __init__(self, cache, key, headers):
self.cache = cache
self.key = key
self.can_cache = True
self.headers = headers
self.response_headers_len = 0
self.response = ''
self.partial_response = ''
self.complete = False
self.compressed = ''
self.last_access = time.time()
self.last_fetch = self.last_access
self.listeners = []
class Cache(object):
def __init__(self):
self.pages = {}
self.allocated = 0
def serve_page(self, host, path, headers, client):
key = (host, path)
page = self.pages.get(key)
if page:
page.last_access = time.time()
if page.complete:
if client.accept_deflate and page.compressed:
client.send(page.compressed)
client.terminate()
else:
client.send(page.response)
client.terminate()
return
elif page.can_cache:
client.send(page.partial_response)
page.listeners.append(client)
return
# start a new request
page = Page(self, key, headers)
page.listeners.append(client)
try:
sock = socket.socket()
sock.connect(settings.backend_address)
page.backend = CachingClient(loop, sock, page)
page.backend.send(headers)
except IOError:
client.send_error('bad gateway')
self.pages[key] = page
def purge(self, page):
page.can_cache = False
self.release(len(page.response))
self.release(len(page.partial_response))
page.response = ''
page.partial_response = ''
page.headers = ''
if page.key in self.pages:
del self.pages[page.key]
def reserve(self, bytes):
if bytes > settings.max_memory:
return False
while bytes > (settings.max_memory - self.allocated):
page = min(self.pages.values(), key=lambda x: x.last_access)
self.purge(page)
self.allocated += bytes
return True
def release(self, bytes):
self.allocated -= bytes
def refresh(self):
now = time.time()
expired = now - settings.cache_refresh
forget = now - settings.max_cache_age
for page in self.pages.values():
if page.last_access < forget:
self.purge(page)
continue
if page.last_fetch < expired:
try:
sock = socket.socket()
sock.connect(settings.backend_address)
page.backend = CachingClient(loop, sock, page)
page.backend.send(page.headers)
page.last_fetch = now
except IOError:
self.purge(page)
break
cache = Cache()
class BackendClient(evhttpconn.Connection):
def __init__(self, loop, sock, client):
super(BackendClient, self).__init__(loop, sock)
self.client = client
def on_headers_end(self, message):
self.client.send(message)
def on_chunk(self, chunk):
self.client.send(chunk)
def on_complete(self):
self.close()
def on_close(self):
super(BackendClient, self).on_close()
self.client.terminate()
class Client(evhttpconn.Connection):
all_clients = []
def __init__(self, loop, sock, addr, server):
super(Client, self).__init__(loop, sock)
self.all_clients.append(self)
self.proxy = True
self.backend = None
self.can_cache = True
self.host = None
self.path = None
self.accept_deflate = False
def on_first_line(self, method, path, protocol):
if method != 'GET':
self.can_cache = False
else:
self.path = path
def on_header(self, key, value):
if key == 'cookie':
self.can_cache = False
elif key == 'host':
self.host = value
elif key == 'accept-encoding':
self.accept_deflate = 'deflate' in value
def on_headers_end(self, message):
if self.can_cache:
cache.serve_page(self.host, self.path, message, self)
else:
try:
sock = socket.socket()
sock.connect(settings.backend_address)
self.backend = BackendClient(loop, sock, self)
self.backend.send(message)
except IOError:
self.send_error('bad gateway')
def on_chunk(self, chunk):
if self.backend:
self.backend.send(chunk)
def on_close(self):
super(Client, self).on_close()
self.all_clients.remove(self)
def send_error(self, name):
code, desc, cont = settings.error_codes[name]
self.send('HTTP/1.0 %i %s\r\ncontent-length: %i\r\n\r\n%s\n' % (code, desc, len(cont)+1, cont))
self.terminate()
def main():
# daemonise
if settings.daemonise and os.fork() > 0:
sys.exit(0)
global loop
loop = ev.Loop()
# listen on all ports
servers = []
for address in settings.listen_addresses:
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(64)
servers.append(ev.AsyncServer(loop, sock, Client))
# refresh
interval = 1.0 / settings.cache_refresh_rate
refresh_timer = ev.Timer(loop, cache.refresh, interval, interval)
# compression queue polling
compress_pop_timer = ev.Timer(loop, compressor.poll, 0.01, 0.01)
settings.compress_content = map(re.compile, settings.compress_content)
# chroot and setuid/setgid jail
gid = None
uid = None
gid_name = settings.chgrp
if gid_name is not None:
gid = grp.getgrnam(gid_name)[2]
uid_name = settings.chuid
if uid_name is not None:
uid = pwd.getpwnam(uid_name)[2]
chroot_name = settings.chroot
if chroot_name is not None:
os.chroot(chroot_name)
os.chdir('/')
if gid is not None:
os.setgid(gid)
if uid is not None:
os.setuid(uid)
# run forever
loop.loop(True)
if __name__ == '__main__':
main()