Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 61 additions & 19 deletions ibind/oauth/oauth1a.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,18 @@ def version(self):
dh_prime: str = var.IBIND_OAUTH1A_DH_PRIME
""" The hex representation of the Diffie-Hellman prime. """

encryption_key_fp: str = var.IBIND_OAUTH1A_ENCRYPTION_KEY_FP
encryption_key_fp: Optional[str] = var.IBIND_OAUTH1A_ENCRYPTION_KEY_FP
""" The path to the private OAuth 1.0a encryption key. """

signature_key_fp: str = var.IBIND_OAUTH1A_SIGNATURE_KEY_FP
encryption_key: Optional[str] = var.IBIND_OAUTH1A_ENCRYPTION_KEY
""" The private OAuth 1.0a encryption key content (alternative to encryption_key_fp). """

signature_key_fp: Optional[str] = var.IBIND_OAUTH1A_SIGNATURE_KEY_FP
""" The path to the private OAuth 1.0a signature key. """

signature_key: Optional[str] = var.IBIND_OAUTH1A_SIGNATURE_KEY
""" The private OAuth 1.0a signature key content (alternative to signature_key_fp). """

dh_generator: str = var.IBIND_OAUTH1A_DH_GENERATOR
""" The Diffie-Hellman generator value. """

Expand All @@ -93,20 +99,29 @@ def verify_config(self) -> None:
'access_token_secret',
'consumer_key',
'dh_prime',
'encryption_key_fp',
'signature_key_fp',
]
missing_params = [param for param in required_params if getattr(self, param) is None]
if missing_params:
raise ValueError(f'OAuth1aConfig is missing required parameters: {", ".join(missing_params)}')

required_filepaths = [
'encryption_key_fp',
'signature_key_fp',
]
missing_filepaths = [filepath for filepath in required_filepaths if not Path(getattr(self, filepath)).exists()]
if missing_filepaths:
raise ValueError(f"OAuth1aConfig's filepaths don't exist: {', '.join(missing_filepaths)}")
# Validate encryption key: at least one of (encryption_key_fp or encryption_key) must be provided
has_encryption_fp = self.encryption_key_fp is not None
has_encryption_content = self.encryption_key is not None
if not (has_encryption_fp or has_encryption_content):
raise ValueError('OAuth1aConfig requires either encryption_key_fp or encryption_key to be provided')

# Validate signature key: at least one of (signature_key_fp or signature_key) must be provided
has_signature_fp = self.signature_key_fp is not None
has_signature_content = self.signature_key is not None
if not (has_signature_fp or has_signature_content):
raise ValueError('OAuth1aConfig requires either signature_key_fp or signature_key to be provided')

# If file paths are provided, validate they exist
if has_encryption_fp and not Path(self.encryption_key_fp).exists():
raise ValueError(f"OAuth1aConfig's encryption_key_fp doesn't exist: {self.encryption_key_fp}")

if has_signature_fp and not Path(self.signature_key_fp).exists():
raise ValueError(f"OAuth1aConfig's signature_key_fp doesn't exist: {self.signature_key_fp}")


def req_live_session_token(client: 'IbkrClient', oauth_config: OAuth1aConfig) -> tuple[str, int, str]:
Expand Down Expand Up @@ -166,7 +181,11 @@ def prepare_oauth(oauth_config: OAuth1aConfig) -> tuple[str, dict, str]:
dh_generator=int(oauth_config.dh_generator),
)
prepend = calculate_live_session_token_prepend(
access_token_secret=oauth_config.access_token_secret, private_encryption_key=read_private_key(private_key_fp=oauth_config.encryption_key_fp)
access_token_secret=oauth_config.access_token_secret,
private_encryption_key=read_private_key(
private_key_fp=oauth_config.encryption_key_fp,
private_key_content=oauth_config.encryption_key
)
)

extra_headers = {'diffie_hellman_challenge': dh_challenge}
Expand Down Expand Up @@ -204,7 +223,10 @@ def generate_oauth_headers(
if signature_method == 'HMAC-SHA256':
signature = generate_hmac_sha_256_signature(base_string=base_string, live_session_token=live_session_token)
else:
private_signature_key = read_private_key(oauth_config.signature_key_fp)
private_signature_key = read_private_key(
private_key_fp=oauth_config.signature_key_fp,
private_key_content=oauth_config.signature_key
)
signature = generate_rsa_sha_256_signature(base_string=base_string, private_signature_key=private_signature_key)

headers.update({'oauth_signature': signature})
Expand All @@ -229,13 +251,33 @@ def generate_request_timestamp() -> str:
return str(int(time.time()))


def read_private_key(private_key_fp: str) -> RSA.RsaKey:
def read_private_key(private_key_fp: Optional[str] = None, private_key_content: Optional[str] = None) -> RSA.RsaKey:
"""
Reads the private key from the file path provided. The key is used to sign the request and decrypt the access token secret.
"""
file_mode = 'r'
with open(private_key_fp, file_mode) as f:
private_key = RSA.importKey(f.read())
Reads the private key from either a file path or directly from content.
At least one of private_key_fp or private_key_content must be provided.
If both are provided, private_key_content takes precedence.

Parameters:
private_key_fp: The file path to the private key.
private_key_content: The private key content as a string.

Returns:
RSA.RsaKey: The imported RSA private key.

Raises:
ValueError: If neither private_key_fp nor private_key_content is provided.
"""
if private_key_content:
# Use the content directly
private_key = RSA.importKey(private_key_content)
elif private_key_fp is not None:
# Read from file
file_mode = 'r'
with open(private_key_fp, file_mode) as f:
private_key = RSA.importKey(f.read())
else:
raise ValueError('Either private_key_fp or private_key_content must be provided')

return private_key


Expand Down
6 changes: 6 additions & 0 deletions ibind/var.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,15 @@ def to_bool(value):
IBIND_OAUTH1A_ENCRYPTION_KEY_FP = os.getenv('IBIND_OAUTH1A_ENCRYPTION_KEY_FP', None)
""" The path to the private OAuth 1.0a encryption key. """

IBIND_OAUTH1A_ENCRYPTION_KEY = os.getenv('IBIND_OAUTH1A_ENCRYPTION_KEY', None)
""" The private OAuth 1.0a encryption key content (alternative to IBIND_OAUTH1A_ENCRYPTION_KEY_FP). """

IBIND_OAUTH1A_SIGNATURE_KEY_FP = os.getenv('IBIND_OAUTH1A_SIGNATURE_KEY_FP', None)
""" The path to the private OAuth 1.0a signature key. """

IBIND_OAUTH1A_SIGNATURE_KEY = os.getenv('IBIND_OAUTH1A_SIGNATURE_KEY', None)
""" The private OAuth 1.0a signature key content (alternative to IBIND_OAUTH1A_SIGNATURE_KEY_FP). """

IBIND_OAUTH1A_DH_GENERATOR = int(os.getenv('IBIND_OAUTH1A_DH_GENERATOR', 2))
""" The Diffie-Hellman generator value. """

Expand Down