From 4cec942613934e59ea2c2ccd4c49561628f2f79f Mon Sep 17 00:00:00 2001 From: Andy Date: Sat, 4 Oct 2025 21:58:30 +0000 Subject: [PATCH] feat(vaults): add DecryptLabs API support to HTTP vault --- unshackle/vaults/HTTP.py | 93 +++++++++++++++++++++++++++++++++++----- 1 file changed, 83 insertions(+), 10 deletions(-) diff --git a/unshackle/vaults/HTTP.py b/unshackle/vaults/HTTP.py index b3db62f..65a72f7 100644 --- a/unshackle/vaults/HTTP.py +++ b/unshackle/vaults/HTTP.py @@ -16,13 +16,21 @@ class InsertResult(Enum): class HTTP(Vault): - """Key Vault using HTTP API with support for both query parameters and JSON payloads.""" + """ + Key Vault using HTTP API with support for multiple API modes. + + Supported modes: + - query: Uses GET requests with query parameters + - json: Uses POST requests with JSON payloads + - decrypt_labs: Uses DecryptLabs API format (read-only) + """ def __init__( self, name: str, host: str, - password: str, + password: Optional[str] = None, + api_key: Optional[str] = None, username: Optional[str] = None, api_mode: str = "query", no_push: bool = False, @@ -34,13 +42,17 @@ class HTTP(Vault): name: Vault name host: Host URL password: Password for query mode or API token for json mode - username: Username (required for query mode, ignored for json mode) - api_mode: "query" for query parameters or "json" for JSON API + api_key: API key (alternative to password, used for decrypt_labs mode) + username: Username (required for query mode, ignored for json/decrypt_labs mode) + api_mode: "query" for query parameters, "json" for JSON API, or "decrypt_labs" for DecryptLabs API no_push: If True, this vault will not receive pushed keys """ super().__init__(name, no_push) self.url = host - self.password = password + self.password = api_key or password + if not self.password: + raise ValueError("Either password or api_key is required") + self.username = username self.api_mode = api_mode.lower() self.current_title = None @@ -48,11 +60,15 @@ class HTTP(Vault): self.session.headers.update({"User-Agent": f"unshackle v{__version__}"}) self.api_session_id = None + if self.api_mode == "decrypt_labs": + self.session.headers.update({"decrypt-labs-api-key": self.password}) + self.no_push = True + # Validate configuration based on mode if self.api_mode == "query" and not self.username: raise ValueError("Username is required for query mode") - elif self.api_mode not in ["query", "json"]: - raise ValueError("api_mode must be either 'query' or 'json'") + elif self.api_mode not in ["query", "json", "decrypt_labs"]: + raise ValueError("api_mode must be either 'query', 'json', or 'decrypt_labs'") def request(self, method: str, params: dict = None) -> dict: """Make a request to the JSON API vault.""" @@ -95,7 +111,51 @@ class HTTP(Vault): if isinstance(kid, UUID): kid = kid.hex - if self.api_mode == "json": + if self.api_mode == "decrypt_labs": + try: + request_payload = {"service": service.lower(), "kid": kid} + + response = self.session.post(self.url, json=request_payload) + + if not response.ok: + return None + + data = response.json() + + if data.get("message") != "success": + return None + + cached_keys = data.get("cached_keys") + if not cached_keys: + return None + + if isinstance(cached_keys, str): + try: + cached_keys = json.loads(cached_keys) + except json.JSONDecodeError: + return cached_keys + + if isinstance(cached_keys, dict): + if cached_keys.get("kid") == kid: + return cached_keys.get("key") + if kid in cached_keys: + return cached_keys[kid] + elif isinstance(cached_keys, list): + for entry in cached_keys: + if isinstance(entry, dict): + if entry.get("kid") == kid: + return entry.get("key") + elif isinstance(entry, str) and ":" in entry: + entry_kid, entry_key = entry.split(":", 1) + if entry_kid == kid: + return entry_key + + except Exception as e: + print(f"Failed to get key from DecryptLabs ({e.__class__.__name__}: {e})") + return None + return None + + elif self.api_mode == "json": try: params = { "kid": kid, @@ -132,7 +192,9 @@ class HTTP(Vault): return data["keys"][0]["key"] def get_keys(self, service: str) -> Iterator[tuple[str, str]]: - if self.api_mode == "json": + if self.api_mode == "decrypt_labs": + return iter([]) + elif self.api_mode == "json": # JSON API doesn't support getting all keys, so return empty iterator # This will cause the copy command to rely on the API's internal duplicate handling return iter([]) @@ -153,6 +215,9 @@ class HTTP(Vault): if not key or key.count("0") == len(key): raise ValueError("You cannot add a NULL Content Key to a Vault.") + if self.api_mode == "decrypt_labs": + return False + if isinstance(kid, UUID): kid = kid.hex @@ -192,6 +257,9 @@ class HTTP(Vault): return data.get("status_code") == 200 def add_keys(self, service: str, kid_keys: dict[Union[UUID, str], str]) -> int: + if self.api_mode == "decrypt_labs": + return 0 + for kid, key in kid_keys.items(): if not key or key.count("0") == len(key): raise ValueError("You cannot add a NULL Content Key to a Vault.") @@ -243,7 +311,9 @@ class HTTP(Vault): return inserted_count def get_services(self) -> Iterator[str]: - if self.api_mode == "json": + if self.api_mode == "decrypt_labs": + return iter([]) + elif self.api_mode == "json": try: response = self.request("GetServices") services = response.get("services", []) @@ -283,6 +353,9 @@ class HTTP(Vault): if not key or key.count("0") == len(key): raise ValueError("You cannot add a NULL Content Key to a Vault.") + if self.api_mode == "decrypt_labs": + return InsertResult.FAILURE + if isinstance(kid, UUID): kid = kid.hex