From 396506a5f070693397b239c79515ba064aaabf21 Mon Sep 17 00:00:00 2001 From: Daniel Hougaard Date: Sun, 26 Jan 2025 20:58:10 +0100 Subject: [PATCH] feat: crypto module --- infisical_sdk/__init__.py | 3 +- infisical_sdk/client.py | 36 ++++++- infisical_sdk/crypto.py | 202 ++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 + 4 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 infisical_sdk/crypto.py diff --git a/infisical_sdk/__init__.py b/infisical_sdk/__init__.py index 508a560..117e08a 100644 --- a/infisical_sdk/__init__.py +++ b/infisical_sdk/__init__.py @@ -1,3 +1,4 @@ from .client import InfisicalSDKClient # noqa from .infisical_requests import InfisicalError # noqa -from .api_types import SingleSecretResponse, ListSecretsResponse, BaseSecret # noqa \ No newline at end of file +from .api_types import SingleSecretResponse, ListSecretsResponse, BaseSecret # noqa +from .crypto import create_symmetric_key_helper, encrypt_symmetric_helper, decrypt_symmetric_helper # noqa \ No newline at end of file diff --git a/infisical_sdk/client.py b/infisical_sdk/client.py index 97cc2db..0eb4638 100644 --- a/infisical_sdk/client.py +++ b/infisical_sdk/client.py @@ -15,16 +15,31 @@ from .api_types import ListSecretsResponse, MachineIdentityLoginResponse from .api_types import SingleSecretResponse, BaseSecret +from .crypto import ( + create_symmetric_key_helper, + decrypt_symmetric_helper, + encrypt_symmetric_helper, +) + class InfisicalSDKClient: - def __init__(self, host: str, token: str = None): + def __init__(self, host: str = None, token: str = None): + + if host is None: + host = "https://app.infisical.com" + self.host = host + + if host.endswith("/api"): + host = host[:-4] + self.access_token = token self.api = InfisicalRequests(host=host, token=token) self.auth = Auth(self) self.secrets = V3RawSecrets(self) + self.crypto = Cryptography(self) def set_token(self, token: str): """ @@ -343,3 +358,22 @@ def delete_secret_by_name( ) return result.data.secret + + +class Cryptography: + def __init__(self, client: InfisicalSDKClient) -> None: + self.client = client + + def create_symmetric_key(self) -> str: + """Create a base64-encoded, 256-bit symmetric key""" + return create_symmetric_key_helper() + + def encrypt_symmetric(self, plaintext: str, key: str): + """Encrypt the plaintext `plaintext` with the (base64) 256-bit secret key `key`""" + return encrypt_symmetric_helper(plaintext, key) + + def decrypt_symmetric(self, ciphertext: str, key: str, iv: str, tag: str): + """Decrypt the ciphertext `ciphertext` with the (base64) 256-bit secret key `key`, + provided `iv` and `tag`""" + + return decrypt_symmetric_helper(ciphertext, key, iv, tag) diff --git a/infisical_sdk/crypto.py b/infisical_sdk/crypto.py new file mode 100644 index 0000000..4dc819e --- /dev/null +++ b/infisical_sdk/crypto.py @@ -0,0 +1,202 @@ +from base64 import b64decode, b64encode +from typing import Tuple, Union + +from Cryptodome.Cipher import AES +from Cryptodome.Random import get_random_bytes +from nacl import public, utils + +Base64String = str +Buffer = Union[bytes, bytearray, memoryview] + + +def encrypt_asymmetric( + plaintext: Union[Buffer, str], + public_key: Union[Buffer, Base64String, public.PublicKey], + private_key: Union[Buffer, Base64String, public.PrivateKey], +) -> Tuple[Base64String, Base64String]: + """Performs asymmetric encryption of the ``plaintext`` with x25519-xsalsa20-poly1305 + algorithm with the given parameters. + Each of those params should be either the raw value in bytes or a base64 string. + + :param plaintext: The text to encrypt + :param public_key: The public key + :param private_key: The private key + + :raises ValueError: If ``plaintext``, ``public_key`` or ``private_key`` are empty + + :return: A tuple containing the ciphered text and the random nonce used for encryption + """ + if (not isinstance(public_key, public.PublicKey) and len(public_key) == 0) or ( + not isinstance(private_key, public.PrivateKey) and len(private_key) == 0 + ): + raise ValueError("Public key and private key cannot be empty!") + + m_plaintext = ( + str.encode(plaintext, "utf-8") if isinstance(plaintext, str) else plaintext + ) + m_public_key = ( + b64decode(public_key) if isinstance(public_key, Base64String) else public_key + ) + m_public_key = ( + public.PublicKey(m_public_key) + if isinstance(m_public_key, (bytes, bytearray, memoryview)) + else m_public_key + ) + m_private_key = ( + b64decode(private_key) if isinstance(private_key, Base64String) else private_key + ) + m_private_key = ( + public.PrivateKey(m_private_key) + if isinstance(m_private_key, (bytes, bytearray, memoryview)) + else m_private_key + ) + + nonce = utils.random(24) + box = public.Box(m_private_key, m_public_key) + ciphertext = box.encrypt(m_plaintext, nonce).ciphertext + + return (b64encode(ciphertext).decode("utf-8"), b64encode(nonce).decode("utf-8")) + + +def decrypt_asymmetric( + ciphertext: Union[Buffer, Base64String], + nonce: Union[Buffer, Base64String], + public_key: Union[Buffer, Base64String, public.PublicKey], + private_key: Union[Buffer, Base64String, public.PrivateKey], +) -> str: + """Performs asymmetric decryption of the ``ciphertext`` with x25519-xsalsa20-poly1305 + algorithm with the given parameters. + Each of those params should be either the raw value in bytes or a base64 string. + + :param ciphertext: The ciphered text to decrypt + :param nonce: The nonce used for encryption + :param public_key: The public key + :param private_key: The private key + + :raises ValueError: If ``ciphertext``, ``nonce``, ``public_key`` or ``private_key`` are empty + + :return: The deciphered text + """ + if ( + len(ciphertext) == 0 + or len(nonce) == 0 + or (not isinstance(public_key, public.PublicKey) and len(public_key) == 0) + or (not isinstance(private_key, public.PrivateKey) and len(private_key) == 0) + ): + raise ValueError( + "Public key, private key, ciphertext and nonce cannot be empty!" + ) + + m_ciphertext = ( + b64decode(ciphertext) if isinstance(ciphertext, Base64String) else ciphertext + ) + m_nonce = b64decode(nonce) if isinstance(nonce, Base64String) else nonce + m_public_key = ( + b64decode(public_key) if isinstance(public_key, Base64String) else public_key + ) + m_public_key = ( + public.PublicKey(m_public_key) + if isinstance(m_public_key, (bytes, bytearray, memoryview)) + else m_public_key + ) + m_private_key = ( + b64decode(private_key) if isinstance(private_key, Base64String) else private_key + ) + m_private_key = ( + public.PrivateKey(m_private_key) + if isinstance(m_private_key, (bytes, bytearray, memoryview)) + else m_private_key + ) + + box = public.Box(m_private_key, m_public_key) + plaintext = box.decrypt(m_ciphertext, m_nonce) + + return plaintext.decode("utf-8") + + +def create_symmetric_key_helper(): + return b64encode(get_random_bytes(32)).decode("utf-8") + + +def encrypt_symmetric_helper(plaintext: str, key: str): + iv = get_random_bytes(12) + + cipher = AES.new(b64decode(key), AES.MODE_GCM, nonce=iv) + + ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode("utf-8")) + + return ( + b64encode(ciphertext).decode("utf-8"), + b64encode(iv).decode("utf-8"), + b64encode(tag).decode("utf-8"), + ) + + +def decrypt_symmetric_helper(ciphertext: str, key: str, iv: str, tag: str): + cipher = AES.new(b64decode(key), AES.MODE_GCM, nonce=b64decode(iv)) + plaintext = cipher.decrypt_and_verify(b64decode(ciphertext), b64decode(tag)) + + return plaintext.decode("utf-8") + + +def encrypt_symmetric_128_bit_hex_key_utf8( + plaintext: str, key: str +) -> Tuple[Base64String, Base64String, Base64String]: + """Encrypts the ``plaintext`` with aes-256-gcm using the given ``key``. + The key should be either the raw value in bytes or a base64 string. + + :param plaintext: text to encrypt + :param key: UTF-8, 128-bit AES key used for encryption + + :raises ValueError: If either ``plaintext`` or ``key`` is empty + + :return: Ciphered text + """ + if len(key) == 0: + raise ValueError("The given key is empty!") + + BLOCK_SIZE_BYTES = 16 + + iv = get_random_bytes(BLOCK_SIZE_BYTES) + cipher = AES.new(bytes(key, "utf-8"), AES.MODE_GCM, nonce=iv) + + ciphertext, tag = cipher.encrypt_and_digest(str.encode(plaintext, "utf-8")) + + return ( + b64encode(ciphertext).decode("utf-8"), + b64encode(iv).decode("utf-8"), + b64encode(tag).decode("utf-8"), + ) + + +def decrypt_symmetric_128_bit_hex_key_utf8( + key: str, ciphertext: str, tag: str, iv: str +) -> str: + """Decrypts the ``ciphertext`` with aes-256-gcm using ``iv``, ``tag`` + and ``key``. + + :param key: UTF-8, 128-bit hex AES key + :param ciphertext: base64 ciphered text to decrypt + :param tag: base64 tag/mac used for verification + :param iv: base64 nonce + + :raises ValueError: + If ``ciphertext``, ``iv``, ``tag`` or ``key`` are empty or tag/mac doesn't match + + :return: Deciphered text + """ + if len(tag) == 0 or len(iv) == 0 or len(key) == 0: + raise ValueError("One of the given parameter is empty!") + + try: + key = bytes(key, "utf-8") + iv = b64decode(iv) + tag = b64decode(tag) + ciphertext = b64decode(ciphertext) + + cipher = AES.new(key, AES.MODE_GCM, nonce=iv) + plaintext = cipher.decrypt_and_verify(ciphertext, tag) + + return plaintext.decode("utf-8") + except ValueError: + raise ValueError("Incorrect decryption or MAC check failed") diff --git a/requirements.txt b/requirements.txt index 6d8cfa1..c9b2523 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ aenum >= 3.1.11 requests >= 2.31.0 boto3 >= 1.33.8 botocore >= 1.33.8 +pycryptodomex >= 3.20.0 +PyNaCl >= 1.5.0 \ No newline at end of file