-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserving_stocks.py
More file actions
107 lines (94 loc) · 4.39 KB
/
serving_stocks.py
File metadata and controls
107 lines (94 loc) · 4.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Copyright 2019 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mlrun
import torch
from cloudpickle import load
from typing import List
import numpy as np
import warnings
import cloudpickle
import mlrun.feature_store as fstore
import pandas as pd
from mlrun.frameworks.pytorch import PyTorchModelServer
import datetime
import v3io_frames as v3f
import os
warnings.filterwarnings("ignore")
def preprocess(event):
vector_name = event['vector_name']
start_time = datetime.datetime.now()-datetime.timedelta(event['start_time'])
end_time = datetime.datetime.now()-datetime.timedelta(event['end_time'])
seq_size = event['seq_size']
train_dataset = fstore.get_offline_features(vector_name, entity_timestamp_column='Datetime', with_indexes=True, start_time=start_time, end_time=end_time)
price_cols = ['Open','High','Low','Close']
df = train_dataset.to_dataframe().reset_index(drop=False)
df.fillna(value=1,inplace=True)
normalized_df = df.copy()
tickers = df['ticker'].unique()
data = []
labels = []
tickers_list = []
datetimes = []
price_series = pd.concat([normalized_df[col] for col in price_cols])
price_std = price_series.std()
price_mean = price_series.mean()
normalized_df[price_cols] = (normalized_df[price_cols] - price_mean) / price_std
normalized_df['Volume'] = (normalized_df['Volume'] - normalized_df['Volume'].mean()) / normalized_df['Volume'].std()
for ticker in tickers:
ticker_df = normalized_df[normalized_df['ticker'] == ticker].sort_values(by='Datetime',ascending=True)
datetimes.append(list(ticker_df['Datetime'])[-1])
ticker_df = ticker_df.drop(['ticker','Datetime'],axis=1)
data.append(ticker_df[-seq_size-1:-1].values.tolist())
labels.append(list(ticker_df['Close'])[-1])
tickers_list.append(ticker)
data = torch.tensor(data).detach()
labels = torch.tensor(labels, dtype=torch.float).detach()
price_series = pd.concat([normalized_df[col] for col in price_cols])
event['columns'] = list(normalized_df.drop(['ticker','Datetime'],axis=1,inplace=False).columns)
event['price_mean'] = price_mean
event['price_std'] = price_std
event['volume_mean'] = normalized_df['Volume'].mean()
event['volume_std'] = normalized_df['Volume'].std()
event['tickers'] = tickers_list
event['datetimes'] = datetimes
event['inputs'] = data.tolist()
event['labels'] = labels.tolist()
return event
def postprocess(event):
df = pd.DataFrame(data=event['outputs']['results'],columns=['prediction'])
df['datetime'] = event['outputs']['datetimes']
df['tickers'] = event['outputs']['tickers']
df['key'] = [ticker + ' ' + datetime.strftime('%Y-%m-%d %H:%M:%S') for ticker,datetime in zip(df['tickers'],df['datetime'])]
df['true'] = event['outputs']['labels']
df['prediction'] = (df['prediction']*event['outputs']['price_std']) + event['outputs']['price_mean']
df['true'] = (df['true']*event['outputs']['price_std']) + event['outputs']['price_mean']
df2 = df.copy()
df['datetime'] = df['datetime'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
# writing to tsdb
framesd = os.getenv("V3IO_FRAMESD",'framesd:8081')
client = v3f.Client(framesd, container=os.getenv('V3IO_CONTAINER', 'projects'))
kv_table_path = '/stocks-'+ os.environ['V3IO_USERNAME'] + '/artifacts/stocks_prediction'
client.write('kv', kv_table_path, dfs=df.reset_index(), index_cols=['key'])
return [df.values.tolist(),list(df.columns)]
class StocksModel(PyTorchModelServer):
def predict(self, body: dict) -> List:
all_results = []
"""Generate model predictions from sample."""
for feats in body['inputs']:
feats = torch.tensor(feats).reshape(1,5,-1)
result: np.ndarray = self.model(feats)
all_results.append(result.tolist()[0])
body['results'] = all_results
return body