Skip to content

Commit

Permalink
feat: add key management API integration to SDK
Browse files Browse the repository at this point in the history
- Added support for KMS key lifecycle (list, create, update, delete) via `V1Keys`.
- Implemented encryption and decryption using KMS keys.
- Updated `api_types.py` and `client.py` for key management functionality.
  • Loading branch information
subhayu99 committed Jan 16, 2025
1 parent 224c16c commit ca73c5c
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 0 deletions.
52 changes: 52 additions & 0 deletions infisical_sdk/api_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,55 @@ class MachineIdentityLoginResponse(BaseModel):
expiresIn: int
accessTokenMaxTTL: int
tokenType: str


@dataclass
class BaseKey(BaseModel):
createdAt: str
id: str
name: str
orgId: str
updatedAt: str
description: Optional[str] = None
isDisabled: Optional[bool] = field(default=False)
isReserved: Optional[bool] = field(default=True)
projectId: Optional[str] = None
slug: Optional[str] = None


@dataclass
class ListKey(BaseKey):
encryptionAlgorithm: str = "aes-256-gcm"
version: int = 1

@dataclass
class ListKeysResponse(BaseModel):
keys: List[ListKey]
totalCount: int

@classmethod
def from_dict(cls, data: Dict) -> 'ListKeysResponse':
return cls(
keys=[ListKey.from_dict(key) for key in data['keys']],
totalCount=data['totalCount']
)


@dataclass
class SingleKeyResponse(BaseModel):
key: BaseKey

@classmethod
def from_dict(cls, data: Dict) -> 'SingleKeyResponse':
return cls(
key=BaseKey.from_dict(data['key'])
)

@dataclass
class EncryptDataResponse(BaseModel):
ciphertext: str


@dataclass
class DecryptDataResponse(BaseModel):
plaintext: str
171 changes: 171 additions & 0 deletions infisical_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from .infisical_requests import InfisicalRequests
from .api_types import ListSecretsResponse, MachineIdentityLoginResponse
from .api_types import SingleSecretResponse, BaseSecret
from .api_types import (
BaseKey,
ListKeysResponse,
SingleKeyResponse,
EncryptDataResponse,
DecryptDataResponse,
)


class InfisicalSDKClient:
Expand All @@ -25,6 +32,7 @@ def __init__(self, host: str, token: str = None):

self.auth = Auth(self)
self.secrets = V3RawSecrets(self)
self.keys = V1Keys(self)

def set_token(self, token: str):
"""
Expand Down Expand Up @@ -343,3 +351,166 @@ def delete_secret_by_name(
)

return result.data.secret

class V1Keys:
def __init__(self, client: InfisicalSDKClient):
"""
Initializes the KeysAPI class.
Args:
client: An instance of the API client.
"""
self.client = client

def list_keys(
self,
project_id: str,
offset: int = 0,
limit: int = 100,
order_by: str = "name",
order_direction: str = "asc",
search: str = None,
) -> ListKeysResponse:
"""
List KMS keys for a given project.
Args:
project_id: The project ID to list keys from.
offset: The offset to start from.
limit: The number of keys to return.
order_by: The column to order keys by.
order_direction: The direction to order keys (asc/desc).
search: A text string to filter key names.
Returns:
A dictionary containing the list of keys and the total count.
"""
params = {
"projectId": project_id,
"offset": offset,
"limit": limit,
"orderBy": order_by,
"orderDirection": order_direction,
}
if search:
params["search"] = search

response = self.client.api.get(
path="/api/v1/kms/keys", model=ListKeysResponse, params=params
)
return response.data

def create_key(
self,
name: str,
project_id: str,
description: str = None,
encryption_algorithm: str = "aes-256-gcm",
) -> BaseKey:
"""
Create a new KMS key.
Args:
name: The name of the key.
project_id: The project ID to create the key in.
description: An optional description of the key.
encryption_algorithm: The encryption algorithm to use.
Returns:
A dictionary containing the created key details.
"""
body = {
"name": name,
"projectId": project_id,
"description": description,
"encryptionAlgorithm": encryption_algorithm,
}
response = self.client.api.post(
path="/api/v1/kms/keys", model=SingleKeyResponse, json=body
)
return response.data.key

def update_key(
self,
key_id: str,
name: str = None,
description: str = None,
is_disabled: bool = None,
) -> BaseKey:
"""
Update a KMS key.
Args:
key_id: The ID of the key to update.
name: The updated name of the key.
description: The updated description of the key.
is_disabled: Flag to enable or disable the key.
Returns:
A dictionary containing the updated key details.
"""
body = {}
if name:
body["name"] = name
if description:
body["description"] = description
if is_disabled is not None:
body["isDisabled"] = is_disabled

response = self.client.api.patch(
path=f"/api/v1/kms/keys/{key_id}", model=SingleKeyResponse, json=body
)
return response.data.key

def delete_key(self, key_id: str) -> BaseKey:
"""
Delete a KMS key.
Args:
key_id: The ID of the key to delete.
Returns:
A dictionary containing the details of the deleted key.
"""
response = self.client.api.delete(
path=f"/api/v1/kms/keys/{key_id}", model=SingleKeyResponse
)
return response.data.key

def encrypt_data(self, key_id: str, plaintext: str) -> EncryptDataResponse:
"""
Encrypt data using a KMS key.
Args:
key_id: The ID of the key to encrypt the data with.
plaintext: The plaintext to be encrypted (base64 encoded).
Returns:
A dictionary containing the ciphertext.
"""
body = {"plaintext": plaintext}
response = self.client.api.post(
path=f"/api/v1/kms/keys/{key_id}/encrypt",
model=EncryptDataResponse,
json=body,
)
return response.data

def decrypt_data(self, key_id: str, ciphertext: str) -> DecryptDataResponse:
"""
Decrypt data using a KMS key.
Args:
key_id: The ID of the key to decrypt the data with.
ciphertext: The ciphertext to be decrypted (base64 encoded).
Returns:
A dictionary containing the plaintext.
"""
body = {"ciphertext": ciphertext}
response = self.client.api.post(
path=f"/api/v1/kms/keys/{key_id}/decrypt",
model=DecryptDataResponse,
json=body,
)
return response.data

0 comments on commit ca73c5c

Please sign in to comment.