-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfiles.py
More file actions
226 lines (189 loc) · 6.58 KB
/
files.py
File metadata and controls
226 lines (189 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""Wrapper for low-level OS calls regarding files and directories"""
# Grupo 14:
# 81900 - Nuno Anselmo
# 81936 - Liliana Oliveira
# 82047 - Andre Mendes
import os
import shutil
import tempfile
import utils
class File(object):
"""Wrapper for low-level OS calls regarding files"""
def __init__(self, path=None):
super(File, self).__init__()
self.file = None
if path is not None:
self.path = path
else:
temp = tempfile.mkstemp()
os.close(temp[0])
self.path = temp[1]
self.file = open(self.get_path(), 'w+b')
def open(self):
"""Opens the file object"""
self.file = open(self.get_path(), 'rb')
def is_open(self):
"""Returns whether the file is open or not"""
return self.file is not None
def close(self):
"""Closes the file object"""
if self.is_open():
self.file.close()
def flush(self):
"""Forcefully flushes pending data to file"""
if self.is_open():
self.file.flush()
def get_path(self):
"""Returns the file's path"""
return os.path.normpath(self.path)
def get_relpath(self, path=None):
"""Returns the file's path relative to another one"""
if path is None:
path = os.curdir
return os.path.relpath(self.get_path(), path)
def move(self, destination):
"""Moves the file to the given path"""
destination = os.path.normpath(destination)
if utils.DEBUG_LEVEL >= 1:
utils.log_message("DEBUG", "Moving file to path " + str(destination))
reopen = False
if self.is_open():
reopen = True
self.close()
if os.path.exists(destination):
if os.path.isdir(destination):
os.rmdir(destination)
else:
os.remove(destination)
try:
os.makedirs(os.path.split(destination)[0])
except Exception as _:
pass
shutil.move(self.get_path(), destination)
self.path = destination
if reopen:
self.file = open(destination)
def get_timestamp(self):
"""Returns the modified timestamp"""
return int(os.stat(self.get_path()).st_mtime)
def set_timestamp(self, timestamp):
"""Sets the modified and accessed timestamp to the given one"""
os.utime(self.get_path(), (timestamp, timestamp))
def set_position(self, position):
"""Sets the seek position"""
if self.is_open():
self.file.seek(position)
def get_position(self):
"""Gets the current seek position"""
if self.is_open():
return self.file.tell()
else:
return 0
def get_size(self):
"""Gets the size"""
return int(os.stat(self.get_path()).st_size)
def write(self, data):
"""Writes data to file"""
if not self.is_open():
self.open()
self.file.write(data)
self.close()
else:
self.file.write(data)
def read(self, count):
"""Reads data from the file"""
if not self.is_open():
self.open()
data = self.file.read(count)
self.close()
else:
data = self.file.read(count)
return data
def bytes(self):
"""Byte generator for this file"""
for chunk in self.chunks(8192):
for byte in chunk:
yield byte
def chunks(self, chunksize=1024):
"""Chunk generator for this file"""
close = False
if not self.is_open():
close = True
self.open()
temp = self.get_position()
self.set_position(0)
while True:
chunk = self.read(chunksize)
if chunk:
yield chunk
else:
break
self.set_position(temp)
if close:
self.close()
class Directory(object):
"""Wrapper for low-level OS calls regarding directories"""
def __init__(self, path=None):
super(Directory, self).__init__()
if path is not None:
self.path = os.path.normpath(path)
if not os.path.exists(self.path):
os.makedirs(self.path)
else:
self.path = tempfile.mkdtemp()
def get_path(self):
"""Returns the directory's path"""
return os.path.realpath(self.path)
def get_relpath(self, path=None):
"""Returns the directory's path relative to another one"""
if path is None:
path = os.curdir
return os.path.relpath(self.get_path(), path)
def move(self, destination):
"""Moves the directory to the given path"""
destination = os.path.normpath(destination)
if utils.DEBUG_LEVEL >= 1:
utils.log_message("DEBUG", "Moving directory to path " + str(destination))
if os.path.exists(destination):
if os.path.isdir(destination):
os.rmdir(self.get_path())
self.path = destination
return
else:
os.remove(destination)
try:
os.makedirs(os.path.split(destination)[0])
except Exception as _:
pass
shutil.move(self.get_path(), destination)
self.path = destination
def get_timestamp(self):
"""Returns the modified timestamp"""
return int(os.stat(self.get_path()).st_mtime)
def set_timestamp(self, timestamp):
"""Sets the modified and accessed timestamp to the given one"""
os.utime(self.get_path(), (timestamp, timestamp))
def list(self, recursive=True, directories_after_files=False):
"""Generator for this directory, containing files and directories"""
paths = os.listdir(self.get_path())
for path in paths:
abs_path = os.path.join(self.get_path(), path)
if os.path.isdir(abs_path):
directory = Directory(abs_path)
if not directories_after_files:
yield directory
if recursive:
for sub in directory.list(recursive, directories_after_files):
yield sub
if directories_after_files:
yield directory
else:
yield File(abs_path)
def get_wrapper(path):
"""Returns the correct wrapper for the given path"""
if os.path.exists(path):
if os.path.isdir(path):
return Directory(path)
elif os.path.isfile(path):
return File(path)
return None