feat(vaults): add DecryptLabs API support to HTTP vault

This commit is contained in:
Andy
2025-10-04 21:58:30 +00:00
parent d6f8e42f52
commit 4cec942613

View File

@@ -16,13 +16,21 @@ class InsertResult(Enum):
class HTTP(Vault):
"""Key Vault using HTTP API with support for both query parameters and JSON payloads."""
"""
Key Vault using HTTP API with support for multiple API modes.
Supported modes:
- query: Uses GET requests with query parameters
- json: Uses POST requests with JSON payloads
- decrypt_labs: Uses DecryptLabs API format (read-only)
"""
def __init__(
self,
name: str,
host: str,
password: str,
password: Optional[str] = None,
api_key: Optional[str] = None,
username: Optional[str] = None,
api_mode: str = "query",
no_push: bool = False,
@@ -34,13 +42,17 @@ class HTTP(Vault):
name: Vault name
host: Host URL
password: Password for query mode or API token for json mode
username: Username (required for query mode, ignored for json mode)
api_mode: "query" for query parameters or "json" for JSON API
api_key: API key (alternative to password, used for decrypt_labs mode)
username: Username (required for query mode, ignored for json/decrypt_labs mode)
api_mode: "query" for query parameters, "json" for JSON API, or "decrypt_labs" for DecryptLabs API
no_push: If True, this vault will not receive pushed keys
"""
super().__init__(name, no_push)
self.url = host
self.password = password
self.password = api_key or password
if not self.password:
raise ValueError("Either password or api_key is required")
self.username = username
self.api_mode = api_mode.lower()
self.current_title = None
@@ -48,11 +60,15 @@ class HTTP(Vault):
self.session.headers.update({"User-Agent": f"unshackle v{__version__}"})
self.api_session_id = None
if self.api_mode == "decrypt_labs":
self.session.headers.update({"decrypt-labs-api-key": self.password})
self.no_push = True
# Validate configuration based on mode
if self.api_mode == "query" and not self.username:
raise ValueError("Username is required for query mode")
elif self.api_mode not in ["query", "json"]:
raise ValueError("api_mode must be either 'query' or 'json'")
elif self.api_mode not in ["query", "json", "decrypt_labs"]:
raise ValueError("api_mode must be either 'query', 'json', or 'decrypt_labs'")
def request(self, method: str, params: dict = None) -> dict:
"""Make a request to the JSON API vault."""
@@ -95,7 +111,51 @@ class HTTP(Vault):
if isinstance(kid, UUID):
kid = kid.hex
if self.api_mode == "json":
if self.api_mode == "decrypt_labs":
try:
request_payload = {"service": service.lower(), "kid": kid}
response = self.session.post(self.url, json=request_payload)
if not response.ok:
return None
data = response.json()
if data.get("message") != "success":
return None
cached_keys = data.get("cached_keys")
if not cached_keys:
return None
if isinstance(cached_keys, str):
try:
cached_keys = json.loads(cached_keys)
except json.JSONDecodeError:
return cached_keys
if isinstance(cached_keys, dict):
if cached_keys.get("kid") == kid:
return cached_keys.get("key")
if kid in cached_keys:
return cached_keys[kid]
elif isinstance(cached_keys, list):
for entry in cached_keys:
if isinstance(entry, dict):
if entry.get("kid") == kid:
return entry.get("key")
elif isinstance(entry, str) and ":" in entry:
entry_kid, entry_key = entry.split(":", 1)
if entry_kid == kid:
return entry_key
except Exception as e:
print(f"Failed to get key from DecryptLabs ({e.__class__.__name__}: {e})")
return None
return None
elif self.api_mode == "json":
try:
params = {
"kid": kid,
@@ -132,7 +192,9 @@ class HTTP(Vault):
return data["keys"][0]["key"]
def get_keys(self, service: str) -> Iterator[tuple[str, str]]:
if self.api_mode == "json":
if self.api_mode == "decrypt_labs":
return iter([])
elif self.api_mode == "json":
# JSON API doesn't support getting all keys, so return empty iterator
# This will cause the copy command to rely on the API's internal duplicate handling
return iter([])
@@ -153,6 +215,9 @@ class HTTP(Vault):
if not key or key.count("0") == len(key):
raise ValueError("You cannot add a NULL Content Key to a Vault.")
if self.api_mode == "decrypt_labs":
return False
if isinstance(kid, UUID):
kid = kid.hex
@@ -192,6 +257,9 @@ class HTTP(Vault):
return data.get("status_code") == 200
def add_keys(self, service: str, kid_keys: dict[Union[UUID, str], str]) -> int:
if self.api_mode == "decrypt_labs":
return 0
for kid, key in kid_keys.items():
if not key or key.count("0") == len(key):
raise ValueError("You cannot add a NULL Content Key to a Vault.")
@@ -243,7 +311,9 @@ class HTTP(Vault):
return inserted_count
def get_services(self) -> Iterator[str]:
if self.api_mode == "json":
if self.api_mode == "decrypt_labs":
return iter([])
elif self.api_mode == "json":
try:
response = self.request("GetServices")
services = response.get("services", [])
@@ -283,6 +353,9 @@ class HTTP(Vault):
if not key or key.count("0") == len(key):
raise ValueError("You cannot add a NULL Content Key to a Vault.")
if self.api_mode == "decrypt_labs":
return InsertResult.FAILURE
if isinstance(kid, UUID):
kid = kid.hex