-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathspirius_http_client.py
More file actions
131 lines (103 loc) · 3.91 KB
/
spirius_http_client.py
File metadata and controls
131 lines (103 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import base64
import hmac
import hashlib
import json
import time
from typing import Dict
import requests
from requests import Response
class SpiriusHttpClient:
"""
HTTP client for sending SMS to, and receiving SMS from, Spirius SMS Gateway via its REST API. The HMAC auth method
is used, and signatures for this are generated automatically.
The requests library is used for performing the HTTP requests. All public methods of this class return a Response
object from the requests lib.
"""
__BASE_URL = "https://rest.spirius.com/v1"
__ENCODING = "utf8"
__AUTH_VERSION = "SpiriusSmsV1"
__REQUEST_TIMEOUT_SECONDS = 5
def __init__(self, shared_key: str, username: str):
self.__shared_key = shared_key
self.__username = username
def send_sms(self, to_str: str, from_str: str, message: str) -> Response:
path = "/sms/mt/send"
verb = "POST"
request_body = {
"message": message,
"to": to_str,
"from": from_str,
}
return self.__perform_request(verb, path, request_body)
def get_message_status(self, transaction_id: str) -> Response:
path = f"/sms/mo/status/{transaction_id}"
verb = "GET"
return self.__perform_request(verb, path)
def get_mo_message_list(self) -> Response:
path = f"/sms/mo"
verb = "GET"
return self.__perform_request(verb, path)
def get_mo_message(self, transaction_id: str) -> Response:
path = f"/sms/mo/{transaction_id}"
verb = "GET"
return self.__perform_request(verb, path)
def pop_mo_message(self, transaction_id: str) -> Response:
path = f"/sms/mo/{transaction_id}"
verb = "DELETE"
return self.__perform_request(verb, path)
def pop_next_message(self) -> Response:
path = f"/sms/mo/next"
verb = "DELETE"
return self.__perform_request(verb, path)
def __perform_request(self, verb: str, path: str, body: Dict = None) -> Response:
url = f"{self.__BASE_URL}{path}"
timestamp = self.__create_unix_timestamp()
signature = self.__create_signature(path, timestamp, verb, body)
headers = self.__create_headers(signature, timestamp)
return requests.request(
method=verb,
url=url,
headers=headers,
json=body,
timeout=self.__REQUEST_TIMEOUT_SECONDS,
)
@staticmethod
def __create_unix_timestamp() -> str:
return str(int(time.time()))
def __create_signature(self, path: str, timestamp: str, verb: str, body: Dict = None) -> str:
if body:
request_body_json = json.dumps(body)
body_hash = hashlib.sha1(request_body_json.encode(self.__ENCODING))
else:
body_hash = hashlib.sha1("".encode(self.__ENCODING))
msg_to_sign = "\n".join([
self.__AUTH_VERSION,
timestamp,
verb,
path,
body_hash.hexdigest(),
])
digest = hmac.new(
key=self.__shared_key.encode(self.__ENCODING),
msg=msg_to_sign.encode(self.__ENCODING),
digestmod=hashlib.sha256,
).digest()
return base64.b64encode(digest).decode(self.__ENCODING)
def __create_headers(self, signature, timestamp) -> Dict[str, str]:
return {
"X-SMS-Timestamp": timestamp,
"Authorization": f"{self.__AUTH_VERSION} {self.__username}:{signature}",
"Content-Type": "application/json",
}
if __name__ == "__main__":
client = SpiriusHttpClient(
# Key is available on the account page on https://portal.spirius.com
shared_key="78701a30f3f83437df6284ced6fc9ba58ca6a31c8031df3e8cb7a17eca7b91ed",
username="test"
)
response = client.send_sms(
to_str="+46123456789",
from_str="SPIRIUS",
message="Hello world!"
)
print(response.json())