Coverage for src/onepass_env/core.py: 0%
60 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 11:04 -0300
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 11:04 -0300
1"""Core functionality for importing environment variables from 1Password."""
3from pathlib import Path
4from typing import Any, Dict, List, Optional, Union
6from rich.console import Console
8from onepass_env.exceptions import OnePassEnvError
9from onepass_env.onepassword import OnePasswordClient
11console = Console()
14class EnvImporter:
15 """Imports environment variables from 1Password items."""
17 def __init__(
18 self,
19 vault: str = "tokens",
20 verbose: bool = False
21 ):
22 """Initialize the environment importer.
24 Args:
25 vault: 1Password vault name
26 verbose: Enable verbose logging
27 """
28 self.vault = vault
29 self.verbose = verbose
30 self.op_client = OnePasswordClient(vault=vault, verbose=verbose)
32 def _log(self, message: str) -> None:
33 """Log a message if verbose mode is enabled."""
34 if self.verbose:
35 console.print(f"[dim]{message}[/dim]")
37 def import_to_file(
38 self,
39 item_name: str,
40 output_file: Union[str, Path] = "1pass.env",
41 field_filter: Optional[List[str]] = None,
42 merge_existing: bool = True
43 ) -> Dict[str, str]:
44 """Import environment variables from a 1Password item to a file.
46 Args:
47 item_name: Name of the 1Password item
48 output_file: Path to output file
49 field_filter: List of specific fields to import (None for all)
50 merge_existing: Whether to merge with existing variables
52 Returns:
53 Dictionary of imported variables
54 """
55 output_path = Path(output_file)
57 # Check authentication
58 if not self.op_client.is_authenticated():
59 raise OnePassEnvError(
60 "Not authenticated with 1Password. Please check your OP_SERVICE_ACCOUNT_TOKEN "
61 "environment variable and ensure it has access to the specified vault."
62 )
64 # Get the item from 1Password
65 item = self.op_client.get_item_by_title(item_name)
66 if not item:
67 raise OnePassEnvError(f"Item '{item_name}' not found in vault '{self.vault}'")
69 self._log(f"Found item: {item.title} (ID: {item.id})")
71 # Extract fields
72 imported_vars = {}
73 for field in item.fields:
74 if not field.label or not field.value:
75 continue
77 # Apply field filter if specified
78 if field_filter and field.label not in field_filter:
79 continue
81 imported_vars[field.label] = field.value
82 self._log(f"Imported field: {field.label}")
84 if not imported_vars:
85 self._log("No fields found to import")
86 return {}
88 # Merge with existing variables if requested
89 existing_vars = {}
90 if merge_existing and output_path.exists():
91 try:
92 from dotenv import dotenv_values
93 existing_vars = dict(dotenv_values(str(output_path)))
94 self._log(f"Found {len(existing_vars)} existing variables in {output_file}")
95 except Exception as e:
96 self._log(f"Could not read existing env file: {e}")
98 # Imported variables take precedence
99 final_vars = {**existing_vars, **imported_vars}
101 # Write to file
102 with open(output_path, 'w') as f:
103 f.write(f"# Environment variables imported from 1Password\n")
104 f.write(f"# Vault: {self.vault}\n")
105 f.write(f"# Item: {item_name}\n")
106 f.write(f"# Generated by 1pass-env\n\n")
108 for key, value in final_vars.items():
109 if value is not None:
110 escaped_value = str(value).replace('"', '\\"')
111 f.write(f'{key}="{escaped_value}"\n')
113 self._log(f"Imported {len(imported_vars)} variables to {output_file}")
114 return imported_vars
116 def get_item_fields(self, item_name: str) -> List[str]:
117 """Get list of field names from a 1Password item.
119 Args:
120 item_name: Name of the 1Password item
122 Returns:
123 List of field names
124 """
125 if not self.op_client.is_authenticated():
126 raise OnePassEnvError(
127 "Not authenticated with 1Password. Please check your OP_SERVICE_ACCOUNT_TOKEN "
128 "environment variable."
129 )
131 item = self.op_client.get_item_by_title(item_name)
132 if not item:
133 raise OnePassEnvError(f"Item '{item_name}' not found in vault '{self.vault}'")
135 return [field.label for field in item.fields if field.label and field.value]