-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprepare_data.py
More file actions
146 lines (111 loc) · 4.93 KB
/
Copy pathprepare_data.py
File metadata and controls
146 lines (111 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import pandas as pd
import numpy as np
from pathlib import Path
import math
import argparse
from clean_data import clean_dataset
from utils import fcall
from configs import get_args_parser
from tsmoothie import smoother as sm
from tsmoothie import bootstrap as bs
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer, KNNImputer
from sklearn.preprocessing import StandardScaler
import joblib
class SolarProcess():
def __init__(self, args):
self.args = args
def remove_error(self,dataset):
features = ["power_demand", "power_surplus", "power_generation"]
max_value = 500000
min_value = 0
result = []
for index, row in dataset.iterrows():
for fea in features:
if row[fea] > max_value or row[fea] < min_value:
row[fea] = math.nan
result.append(row)
return pd.DataFrame(result, columns = list(dataset.columns))
def fill_na(self,dataset):
Numerical_features = ["power_generation", "power_surplus", "power_demand", "cloud", "solar"]
Categorical_features = ["telop_name"]
if self.args.fill_na == "fill":
dataset[Numerical_features] = dataset[Numerical_features].fillna(0)
dataset[Categorical_features] = dataset[Categorical_features].apply(lambda x:x.fillna(x.value_counts().index[0]))
elif self.args.fill_na == "remove":
dataset = dataset.dropna()
dataset = dataset.reset_index(drop = True)
return dataset
def impute(self, dataset):
if self.args.impute == "MICE":
imputation = IterativeImputer(max_iter = 1000)
elif self.args.impute == "KNN":
imputation = KNNImputer(n_neighbors=5)
# Full half hours feature
full_half_hours = list(range(min(dataset["half_hours_from_start"]), max(dataset["half_hours_from_start"])))
full_half_hours = pd.DataFrame(full_half_hours, columns = ["half_hours_from_start"])
dataset = pd.merge(full_half_hours, dataset, how = "outer", on = ["half_hours_from_start"]).reset_index(drop = True)
TempData = dataset[["date", "time", "telop_name"]]
dataset = dataset.drop(columns = ["date", "telop_name", "telop_code", "time"])
dataset = pd.DataFrame(imputation.fit_transform(dataset), columns = dataset.columns)
return dataset.join(TempData)
def smooth(self, dataset):
if self.args.test: return dataset
features = ["power_generation", "power_demand"]
if self.args.smooth == "Exponent":
smoother = sm.ExponentialSmoother(window_len= self.args.window_size, alpha = 0.3)
fea_data = []
for fea in features:
sample = dataset[fea].tolist()
fea_data.append(sample)
smoother.smooth(fea_data)
low_interval, up_interval = smoother.get_intervals(self.args.interval + "_interval")
result = []
for index_row, row in dataset.iterrows():
check = True
for index_series, fea in enumerate(features):
if index_row >= self.args.window_size:
if row[fea] > up_interval[index_series][index_row - self.args.window_size]:
check = False
if check == True:
result.append(row)
return pd.DataFrame(result, columns = list(dataset.columns))
def customize(self, dataset):
# Time index should be type integer
dataset = dataset.astype({'half_hours_from_start': 'int32'})
return dataset
def normalization(self, dataset):
features = ["power_generation", "power_demand"]
ss = StandardScaler()
dataset[features] = ss.fit_transform(dataset[features])
joblib.dump(ss, args.data_output_dir + '{}_scaler.gz'.format(args.station))
return dataset
def parse(self,dataset):
funcs = [
self.remove_error,
#self.impute,
self.fill_na,
self.smooth,
#self.normalization
#self.customize,
]
for fun in funcs:
dataset = fun(dataset)
return dataset
def prepare_dataset(args, dataset = None):
if args.test == False:
dataset = clean_dataset(args)
Process= SolarProcess(args)
prepared_data = Process.parse(dataset)
prepared_data = prepared_data.sort_values(by = "date")
if args.test == False:
prepared_data.to_csv(args.data_output_dir + "/{}.csv".format(args.station))
return prepared_data
# Prepare Data by cleaning and preprocessing
if __name__ == "__main__":
parser = argparse.ArgumentParser('Solar Model for forecasting task', parents=[get_args_parser()])
args = parser.parse_args()
if args.output_dir:
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
prepare_dataset(args)