Coverage for src/onepass_env/core.py: 0%

60 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-18 11:04 -0300

1"""Core functionality for importing environment variables from 1Password.""" 

2 

3from pathlib import Path 

4from typing import Any, Dict, List, Optional, Union 

5 

6from rich.console import Console 

7 

8from onepass_env.exceptions import OnePassEnvError 

9from onepass_env.onepassword import OnePasswordClient 

10 

11console = Console() 

12 

13 

14class EnvImporter: 

15 """Imports environment variables from 1Password items.""" 

16 

17 def __init__( 

18 self, 

19 vault: str = "tokens", 

20 verbose: bool = False 

21 ): 

22 """Initialize the environment importer. 

23  

24 Args: 

25 vault: 1Password vault name 

26 verbose: Enable verbose logging 

27 """ 

28 self.vault = vault 

29 self.verbose = verbose 

30 self.op_client = OnePasswordClient(vault=vault, verbose=verbose) 

31 

32 def _log(self, message: str) -> None: 

33 """Log a message if verbose mode is enabled.""" 

34 if self.verbose: 

35 console.print(f"[dim]{message}[/dim]") 

36 

37 def import_to_file( 

38 self, 

39 item_name: str, 

40 output_file: Union[str, Path] = "1pass.env", 

41 field_filter: Optional[List[str]] = None, 

42 merge_existing: bool = True 

43 ) -> Dict[str, str]: 

44 """Import environment variables from a 1Password item to a file. 

45  

46 Args: 

47 item_name: Name of the 1Password item 

48 output_file: Path to output file 

49 field_filter: List of specific fields to import (None for all) 

50 merge_existing: Whether to merge with existing variables 

51  

52 Returns: 

53 Dictionary of imported variables 

54 """ 

55 output_path = Path(output_file) 

56 

57 # Check authentication 

58 if not self.op_client.is_authenticated(): 

59 raise OnePassEnvError( 

60 "Not authenticated with 1Password. Please check your OP_SERVICE_ACCOUNT_TOKEN " 

61 "environment variable and ensure it has access to the specified vault." 

62 ) 

63 

64 # Get the item from 1Password 

65 item = self.op_client.get_item_by_title(item_name) 

66 if not item: 

67 raise OnePassEnvError(f"Item '{item_name}' not found in vault '{self.vault}'") 

68 

69 self._log(f"Found item: {item.title} (ID: {item.id})") 

70 

71 # Extract fields 

72 imported_vars = {} 

73 for field in item.fields: 

74 if not field.label or not field.value: 

75 continue 

76 

77 # Apply field filter if specified 

78 if field_filter and field.label not in field_filter: 

79 continue 

80 

81 imported_vars[field.label] = field.value 

82 self._log(f"Imported field: {field.label}") 

83 

84 if not imported_vars: 

85 self._log("No fields found to import") 

86 return {} 

87 

88 # Merge with existing variables if requested 

89 existing_vars = {} 

90 if merge_existing and output_path.exists(): 

91 try: 

92 from dotenv import dotenv_values 

93 existing_vars = dict(dotenv_values(str(output_path))) 

94 self._log(f"Found {len(existing_vars)} existing variables in {output_file}") 

95 except Exception as e: 

96 self._log(f"Could not read existing env file: {e}") 

97 

98 # Imported variables take precedence 

99 final_vars = {**existing_vars, **imported_vars} 

100 

101 # Write to file 

102 with open(output_path, 'w') as f: 

103 f.write(f"# Environment variables imported from 1Password\n") 

104 f.write(f"# Vault: {self.vault}\n") 

105 f.write(f"# Item: {item_name}\n") 

106 f.write(f"# Generated by 1pass-env\n\n") 

107 

108 for key, value in final_vars.items(): 

109 if value is not None: 

110 escaped_value = str(value).replace('"', '\\"') 

111 f.write(f'{key}="{escaped_value}"\n') 

112 

113 self._log(f"Imported {len(imported_vars)} variables to {output_file}") 

114 return imported_vars 

115 

116 def get_item_fields(self, item_name: str) -> List[str]: 

117 """Get list of field names from a 1Password item. 

118  

119 Args: 

120 item_name: Name of the 1Password item 

121  

122 Returns: 

123 List of field names 

124 """ 

125 if not self.op_client.is_authenticated(): 

126 raise OnePassEnvError( 

127 "Not authenticated with 1Password. Please check your OP_SERVICE_ACCOUNT_TOKEN " 

128 "environment variable." 

129 ) 

130 

131 item = self.op_client.get_item_by_title(item_name) 

132 if not item: 

133 raise OnePassEnvError(f"Item '{item_name}' not found in vault '{self.vault}'") 

134 

135 return [field.label for field in item.fields if field.label and field.value]