Coverage for main.py: 74%
278 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 11:04 -0300
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 11:04 -0300
1import click
2import os
3from onepassword.client import Client
4from dotenv import load_dotenv, dotenv_values
5import asyncio
7# Load environment variables from .env file
8load_dotenv()
10def validate_output_path(file_path):
11 """Validate that we can write to the output file path."""
12 try:
13 # Check if directory exists and is writable
14 directory = os.path.dirname(os.path.abspath(file_path))
15 if not os.path.exists(directory):
16 try:
17 os.makedirs(directory, exist_ok=True)
18 except PermissionError:
19 return False, f"Cannot create directory '{directory}' - permission denied"
21 # Check if we can write to the directory
22 if not os.access(directory, os.W_OK):
23 return False, f"Directory '{directory}' is not writable"
25 # If file exists, check if it's writable
26 if os.path.exists(file_path) and not os.access(file_path, os.W_OK):
27 return False, f"File '{file_path}' exists but is not writable"
29 return True, None
30 except Exception as e:
31 return False, f"Path validation error: {str(e)}"
33def validate_fields(field_names):
34 """Validate field names for basic format."""
35 if not field_names:
36 return True, None
38 invalid_fields = []
39 for field in field_names:
40 # Basic validation: no spaces, not empty, reasonable length
41 if not field.strip():
42 invalid_fields.append("(empty)")
43 elif ' ' in field:
44 invalid_fields.append(f"'{field}' (contains spaces)")
45 elif len(field) > 100:
46 invalid_fields.append(f"'{field}' (too long)")
48 if invalid_fields:
49 return False, f"Invalid field names: {', '.join(invalid_fields)}"
51 return True, None
53async def get_client():
54 """Initialize and authenticate 1Password client with proper error handling."""
55 try:
56 # Get the service account token from environment
57 token = os.getenv('OP_SERVICE_ACCOUNT_TOKEN')
58 if not token:
59 raise ValueError("OP_SERVICE_ACCOUNT_TOKEN environment variable is required")
61 # Initialize 1Password client (SDK automatically uses OP_SERVICE_ACCOUNT_TOKEN)
62 # client = Client()
63 # await client.authenticate()
64 client = await Client.authenticate(auth=token, integration_name="My 1Password Integration", integration_version="v1.0.0")
66 return client
67 except ValueError as e:
68 click.echo(f"Configuration Error: {str(e)}")
69 raise
70 except Exception as e:
71 click.echo(f"Authentication Error: Failed to connect to 1Password - {str(e)}")
72 raise
74@click.group()
75def cli():
76 """CLI tool to integrate with 1Password for importing/exporting env vars."""
77 pass
79@cli.command(name='import')
80@click.option('--vault', default='tokens', help='1Password vault name (default: tokens)')
81@click.option('--file', default='.env', help='Path to output .env file')
82@click.option('--name', default=None, help='Specific entry name in vault')
83@click.option('--fields', multiple=True, help='Fields to filter (e.g., OPEN_KEY,API_KEY)')
84@click.option('--debug', is_flag=True, help='Enable debug output')
85def import_env(vault, file, name, fields, debug):
86 """Import env vars from 1Password vault to a .env file."""
87 # Input validation
88 valid, error = validate_fields(fields)
89 if not valid:
90 click.echo(f"Validation Error: {error}")
91 return
93 # Use the specified file name directly
94 output_file = file
95 valid, error = validate_output_path(output_file)
96 if not valid:
97 click.echo(f"Validation Error: {error}")
98 return
100 project_name = os.path.basename(os.getcwd())
101 click.echo(f"Importing from vault '{vault}' for project '{project_name}'...")
103 if debug:
104 click.echo(f"Debug: Using vault='{vault}', file='{file}', name='{name}', fields={fields}")
106 async def run():
107 try:
108 client = await get_client()
109 if debug:
110 click.echo("Debug: Successfully authenticated with 1Password")
111 except Exception:
112 # Error already handled and displayed in get_client()
113 return
115 try:
116 # Get vault by name
117 if debug:
118 click.echo("Debug: Fetching vault list...")
119 vaults = await client.vaults.list()
120 if debug:
121 click.echo(f"Debug: Found {len(vaults)} vaults: {[v.title for v in vaults]}")
123 vault_obj = next((v for v in vaults if v.title == vault), None)
124 if not vault_obj:
125 click.echo(f"Error: Vault '{vault}' not found. Available vaults: {', '.join([v.title for v in vaults])}")
126 return
128 if debug:
129 click.echo(f"Debug: Using vault '{vault}' (ID: {vault_obj.id})")
131 env_vars = {}
132 if name:
133 # Fetch specific item
134 try:
135 if debug:
136 click.echo(f"Debug: Searching for specific item '{name}'...")
137 items = await client.items.list(vault_obj.id)
138 if debug:
139 click.echo(f"Debug: Found {len(items)} items in vault")
140 click.echo(f"Debug: Item titles: {[i.title for i in items]}")
142 item_overview = next((i for i in items if i.title == name), None)
143 if not item_overview:
144 available_items = [i.title for i in items]
145 click.echo(f"Error: Item '{name}' not found in vault '{vault}'.")
146 if available_items:
147 click.echo(f"Available items: {', '.join(available_items[:5])}{'...' if len(available_items) > 5 else ''}")
148 return
150 # Fetch full item details (ItemOverview doesn't have fields, need full Item)
151 if debug:
152 click.echo(f"Debug: Fetching full details for item '{name}' (ID: {item_overview.id})")
153 item = await client.items.get(vault_obj.id, item_overview.id)
154 if debug:
155 click.echo(f"Debug: Item has {len(item.fields)} fields")
156 # Debug field attributes
157 if item.fields:
158 sample_field = item.fields[0]
159 click.echo(f"Debug: Sample field attributes: {dir(sample_field)}")
161 for field in item.fields:
162 # ItemField might use different attribute names
163 field_id = getattr(field, 'id', getattr(field, 'field_id', 'unknown'))
164 field_label = getattr(field, 'title', getattr(field, 'label', getattr(field, 'id', field_id)))
165 field_value = getattr(field, 'value', '')
166 field_type = getattr(field, 'type', getattr(field, 'field_type', 'unknown'))
168 if debug:
169 click.echo(f"Debug: Processing field '{field_label}' = '{str(field_value)[:20]}...' (type: {field_type})")
170 if not fields or field_label in fields:
171 env_vars[field_label] = str(field_value)
172 except Exception as e:
173 click.echo(f"Error: Failed to fetch items from vault - {str(e)}")
174 if debug:
175 import traceback
176 click.echo(f"Debug: Full traceback:\n{traceback.format_exc()}")
177 return
178 else:
179 # Fetch all items, use project_name as filter
180 try:
181 if debug:
182 click.echo(f"Debug: Searching for items containing '{project_name}'...")
183 items = await client.items.list(vault_obj.id)
184 if debug:
185 click.echo(f"Debug: Found {len(items)} total items in vault")
187 matching_items = [item for item in items if project_name in item.title]
189 if not matching_items:
190 click.echo(f"Warning: No items found containing '{project_name}' in vault '{vault}'")
191 if debug:
192 click.echo(f"Debug: Available items: {[i.title for i in items]}")
193 return
195 if debug:
196 click.echo(f"Debug: Found {len(matching_items)} matching items: {[i.title for i in matching_items]}")
198 for item_overview in matching_items:
199 if debug:
200 click.echo(f"Debug: Fetching full details for item '{item_overview.title}'")
201 # Fetch full item details
202 item = await client.items.get(vault_obj.id, item_overview.id)
203 if debug:
204 click.echo(f"Debug: Item '{item_overview.title}' has {len(item.fields)} fields")
205 # Debug field attributes for first item
206 if item.fields and item_overview == matching_items[0]:
207 sample_field = item.fields[0]
208 click.echo(f"Debug: Sample field attributes: {dir(sample_field)}")
210 for field in item.fields:
211 # ItemField might use different attribute names
212 field_id = getattr(field, 'id', getattr(field, 'field_id', 'unknown'))
213 field_label = getattr(field, 'title', getattr(field, 'label', getattr(field, 'id', field_id)))
214 field_value = getattr(field, 'value', '')
215 field_type = getattr(field, 'type', getattr(field, 'field_type', 'unknown'))
217 if debug:
218 click.echo(f"Debug: Processing field '{field_label}' = '{str(field_value)[:20]}...'")
219 if not fields or field_label in fields:
220 env_vars[field_label] = str(field_value)
222 click.echo(f"Found {len(matching_items)} matching item(s)")
223 except Exception as e:
224 click.echo(f"Error: Failed to fetch items from vault - {str(e)}")
225 if debug:
226 import traceback
227 click.echo(f"Debug: Full traceback:\n{traceback.format_exc()}")
228 return
230 if not env_vars:
231 click.echo("Warning: No environment variables found to import")
232 return
234 if debug:
235 click.echo(f"Debug: Collected {len(env_vars)} environment variables")
237 # Write to file
238 output_file = file
240 # Check if file exists and prompt for confirmation
241 if os.path.exists(output_file):
242 if debug:
243 click.echo(f"Debug: File '{output_file}' already exists")
245 # Prompt user for confirmation
246 overwrite = click.confirm(
247 f"File '{output_file}' already exists. Do you want to overwrite it?",
248 default=False
249 )
251 if not overwrite:
252 click.echo("Operation cancelled. File not modified.")
253 return
255 if debug:
256 click.echo(f"Debug: User confirmed overwrite of '{output_file}'")
258 try:
259 if debug:
260 click.echo(f"Debug: Writing to file '{output_file}'")
261 with open(output_file, 'w') as f:
262 f.write('# Generated from 1Password\n')
263 for key, value in env_vars.items():
264 f.write(f"{key}={value}\n")
265 click.echo(f"Successfully imported {len(env_vars)} environment variables to '{output_file}'")
266 except PermissionError:
267 click.echo(f"Error: Permission denied writing to '{output_file}'")
268 except Exception as e:
269 click.echo(f"Error: Failed to write file '{output_file}' - {str(e)}")
271 except Exception as e:
272 click.echo(f"Unexpected error: {str(e)}")
273 if debug:
274 import traceback
275 click.echo(f"Debug: Full traceback:\n{traceback.format_exc()}")
277 try:
278 asyncio.run(run())
279 except KeyboardInterrupt:
280 click.echo("\nOperation cancelled by user")
281 except Exception as e:
282 click.echo(f"Fatal error: {str(e)}")
284@cli.command()
285@click.option('--vault', default='tokens', help='1Password vault name (default: tokens)')
286@click.option('--file', default='.env', help='Path to input .env file')
287@click.option('--name', default=None, help='Entry name for vault (defaults to project folder name)')
288@click.option('--fields', multiple=True, help='Fields to export (e.g., OPEN_KEY,API_KEY)')
289@click.option('--debug', is_flag=True, help='Enable debug output')
290def export(vault, file, name, fields, debug):
291 """Export env vars from .env file to 1Password vault."""
292 # Input validation
293 valid, error = validate_fields(fields)
294 if not valid:
295 click.echo(f"Validation Error: {error}")
296 return
298 if not os.path.exists(file):
299 click.echo(f"Validation Error: Input file '{file}' not found")
300 return
302 project_name = os.path.basename(os.getcwd())
303 entry_name = name or project_name
304 click.echo(f"Exporting to vault '{vault}' for entry '{entry_name}'...")
306 if debug:
307 click.echo(f"Debug: Using vault='{vault}', file='{file}', name='{name}', fields={fields}")
309 async def run():
310 try:
311 client = await get_client()
312 if debug:
313 click.echo("Debug: Successfully authenticated with 1Password")
314 except Exception:
315 # Error already handled and displayed in get_client()
316 return
318 try:
319 # Validate input file exists
320 if not os.path.exists(file):
321 click.echo(f"Error: Input file '{file}' not found")
322 return
324 # Read .env file
325 try:
326 env_vars = dotenv_values(file)
327 if not env_vars:
328 click.echo(f"Warning: No environment variables found in '{file}'")
329 return
330 except Exception as e:
331 click.echo(f"Error: Failed to read file '{file}' - {str(e)}")
332 return
334 # Filter fields if specified
335 if fields:
336 original_count = len(env_vars)
337 env_vars = {k: v for k, v in env_vars.items() if k in fields}
338 missing_fields = set(fields) - set(env_vars.keys())
339 if missing_fields:
340 click.echo(f"Warning: Fields not found in file: {', '.join(missing_fields)}")
341 click.echo(f"Filtered {original_count} variables down to {len(env_vars)} specified fields")
343 if not env_vars:
344 click.echo("Error: No environment variables to export")
345 return
347 # Get vault by name
348 try:
349 vaults = await client.vaults.list()
350 vault_obj = next((v for v in vaults if v.title == vault), None)
351 if not vault_obj:
352 available_vaults = [v.title for v in vaults]
353 click.echo(f"Error: Vault '{vault}' not found. Available vaults: {', '.join(available_vaults)}")
354 return
355 except Exception as e:
356 click.echo(f"Error: Failed to list vaults - {str(e)}")
357 return
359 # Create or update item in vault
360 try:
361 fields_list = [{"label": k, "value": v} for k, v in env_vars.items()]
362 await client.items.create(
363 vault_id=vault_obj.id,
364 category="LOGIN", # Using LOGIN as a generic category
365 title=entry_name,
366 fields=fields_list
367 )
368 click.echo(f"Successfully exported {len(env_vars)} environment variables to vault '{vault}' under entry '{entry_name}'")
369 except Exception as e:
370 click.echo(f"Error: Failed to create item in vault - {str(e)}")
372 except Exception as e:
373 click.echo(f"Unexpected error: {str(e)}")
375 try:
376 asyncio.run(run())
377 except KeyboardInterrupt:
378 click.echo("\nOperation cancelled by user")
379 except Exception as e:
380 click.echo(f"Fatal error: {str(e)}")
382if __name__ == '__main__':
383 cli()