Coverage for main.py: 74%

278 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-18 11:04 -0300

1import click 

2import os 

3from onepassword.client import Client 

4from dotenv import load_dotenv, dotenv_values 

5import asyncio 

6 

7# Load environment variables from .env file 

8load_dotenv() 

9 

10def validate_output_path(file_path): 

11 """Validate that we can write to the output file path.""" 

12 try: 

13 # Check if directory exists and is writable 

14 directory = os.path.dirname(os.path.abspath(file_path)) 

15 if not os.path.exists(directory): 

16 try: 

17 os.makedirs(directory, exist_ok=True) 

18 except PermissionError: 

19 return False, f"Cannot create directory '{directory}' - permission denied" 

20 

21 # Check if we can write to the directory 

22 if not os.access(directory, os.W_OK): 

23 return False, f"Directory '{directory}' is not writable" 

24 

25 # If file exists, check if it's writable 

26 if os.path.exists(file_path) and not os.access(file_path, os.W_OK): 

27 return False, f"File '{file_path}' exists but is not writable" 

28 

29 return True, None 

30 except Exception as e: 

31 return False, f"Path validation error: {str(e)}" 

32 

33def validate_fields(field_names): 

34 """Validate field names for basic format.""" 

35 if not field_names: 

36 return True, None 

37 

38 invalid_fields = [] 

39 for field in field_names: 

40 # Basic validation: no spaces, not empty, reasonable length 

41 if not field.strip(): 

42 invalid_fields.append("(empty)") 

43 elif ' ' in field: 

44 invalid_fields.append(f"'{field}' (contains spaces)") 

45 elif len(field) > 100: 

46 invalid_fields.append(f"'{field}' (too long)") 

47 

48 if invalid_fields: 

49 return False, f"Invalid field names: {', '.join(invalid_fields)}" 

50 

51 return True, None 

52 

53async def get_client(): 

54 """Initialize and authenticate 1Password client with proper error handling.""" 

55 try: 

56 # Get the service account token from environment 

57 token = os.getenv('OP_SERVICE_ACCOUNT_TOKEN') 

58 if not token: 

59 raise ValueError("OP_SERVICE_ACCOUNT_TOKEN environment variable is required") 

60 

61 # Initialize 1Password client (SDK automatically uses OP_SERVICE_ACCOUNT_TOKEN) 

62 # client = Client() 

63 # await client.authenticate() 

64 client = await Client.authenticate(auth=token, integration_name="My 1Password Integration", integration_version="v1.0.0") 

65 

66 return client 

67 except ValueError as e: 

68 click.echo(f"Configuration Error: {str(e)}") 

69 raise 

70 except Exception as e: 

71 click.echo(f"Authentication Error: Failed to connect to 1Password - {str(e)}") 

72 raise 

73 

74@click.group() 

75def cli(): 

76 """CLI tool to integrate with 1Password for importing/exporting env vars.""" 

77 pass 

78 

79@cli.command(name='import') 

80@click.option('--vault', default='tokens', help='1Password vault name (default: tokens)') 

81@click.option('--file', default='.env', help='Path to output .env file') 

82@click.option('--name', default=None, help='Specific entry name in vault') 

83@click.option('--fields', multiple=True, help='Fields to filter (e.g., OPEN_KEY,API_KEY)') 

84@click.option('--debug', is_flag=True, help='Enable debug output') 

85def import_env(vault, file, name, fields, debug): 

86 """Import env vars from 1Password vault to a .env file.""" 

87 # Input validation 

88 valid, error = validate_fields(fields) 

89 if not valid: 

90 click.echo(f"Validation Error: {error}") 

91 return 

92 

93 # Use the specified file name directly 

94 output_file = file 

95 valid, error = validate_output_path(output_file) 

96 if not valid: 

97 click.echo(f"Validation Error: {error}") 

98 return 

99 

100 project_name = os.path.basename(os.getcwd()) 

101 click.echo(f"Importing from vault '{vault}' for project '{project_name}'...") 

102 

103 if debug: 

104 click.echo(f"Debug: Using vault='{vault}', file='{file}', name='{name}', fields={fields}") 

105 

106 async def run(): 

107 try: 

108 client = await get_client() 

109 if debug: 

110 click.echo("Debug: Successfully authenticated with 1Password") 

111 except Exception: 

112 # Error already handled and displayed in get_client() 

113 return 

114 

115 try: 

116 # Get vault by name 

117 if debug: 

118 click.echo("Debug: Fetching vault list...") 

119 vaults = await client.vaults.list() 

120 if debug: 

121 click.echo(f"Debug: Found {len(vaults)} vaults: {[v.title for v in vaults]}") 

122 

123 vault_obj = next((v for v in vaults if v.title == vault), None) 

124 if not vault_obj: 

125 click.echo(f"Error: Vault '{vault}' not found. Available vaults: {', '.join([v.title for v in vaults])}") 

126 return 

127 

128 if debug: 

129 click.echo(f"Debug: Using vault '{vault}' (ID: {vault_obj.id})") 

130 

131 env_vars = {} 

132 if name: 

133 # Fetch specific item 

134 try: 

135 if debug: 

136 click.echo(f"Debug: Searching for specific item '{name}'...") 

137 items = await client.items.list(vault_obj.id) 

138 if debug: 

139 click.echo(f"Debug: Found {len(items)} items in vault") 

140 click.echo(f"Debug: Item titles: {[i.title for i in items]}") 

141 

142 item_overview = next((i for i in items if i.title == name), None) 

143 if not item_overview: 

144 available_items = [i.title for i in items] 

145 click.echo(f"Error: Item '{name}' not found in vault '{vault}'.") 

146 if available_items: 

147 click.echo(f"Available items: {', '.join(available_items[:5])}{'...' if len(available_items) > 5 else ''}") 

148 return 

149 

150 # Fetch full item details (ItemOverview doesn't have fields, need full Item) 

151 if debug: 

152 click.echo(f"Debug: Fetching full details for item '{name}' (ID: {item_overview.id})") 

153 item = await client.items.get(vault_obj.id, item_overview.id) 

154 if debug: 

155 click.echo(f"Debug: Item has {len(item.fields)} fields") 

156 # Debug field attributes 

157 if item.fields: 

158 sample_field = item.fields[0] 

159 click.echo(f"Debug: Sample field attributes: {dir(sample_field)}") 

160 

161 for field in item.fields: 

162 # ItemField might use different attribute names 

163 field_id = getattr(field, 'id', getattr(field, 'field_id', 'unknown')) 

164 field_label = getattr(field, 'title', getattr(field, 'label', getattr(field, 'id', field_id))) 

165 field_value = getattr(field, 'value', '') 

166 field_type = getattr(field, 'type', getattr(field, 'field_type', 'unknown')) 

167 

168 if debug: 

169 click.echo(f"Debug: Processing field '{field_label}' = '{str(field_value)[:20]}...' (type: {field_type})") 

170 if not fields or field_label in fields: 

171 env_vars[field_label] = str(field_value) 

172 except Exception as e: 

173 click.echo(f"Error: Failed to fetch items from vault - {str(e)}") 

174 if debug: 

175 import traceback 

176 click.echo(f"Debug: Full traceback:\n{traceback.format_exc()}") 

177 return 

178 else: 

179 # Fetch all items, use project_name as filter 

180 try: 

181 if debug: 

182 click.echo(f"Debug: Searching for items containing '{project_name}'...") 

183 items = await client.items.list(vault_obj.id) 

184 if debug: 

185 click.echo(f"Debug: Found {len(items)} total items in vault") 

186 

187 matching_items = [item for item in items if project_name in item.title] 

188 

189 if not matching_items: 

190 click.echo(f"Warning: No items found containing '{project_name}' in vault '{vault}'") 

191 if debug: 

192 click.echo(f"Debug: Available items: {[i.title for i in items]}") 

193 return 

194 

195 if debug: 

196 click.echo(f"Debug: Found {len(matching_items)} matching items: {[i.title for i in matching_items]}") 

197 

198 for item_overview in matching_items: 

199 if debug: 

200 click.echo(f"Debug: Fetching full details for item '{item_overview.title}'") 

201 # Fetch full item details 

202 item = await client.items.get(vault_obj.id, item_overview.id) 

203 if debug: 

204 click.echo(f"Debug: Item '{item_overview.title}' has {len(item.fields)} fields") 

205 # Debug field attributes for first item 

206 if item.fields and item_overview == matching_items[0]: 

207 sample_field = item.fields[0] 

208 click.echo(f"Debug: Sample field attributes: {dir(sample_field)}") 

209 

210 for field in item.fields: 

211 # ItemField might use different attribute names 

212 field_id = getattr(field, 'id', getattr(field, 'field_id', 'unknown')) 

213 field_label = getattr(field, 'title', getattr(field, 'label', getattr(field, 'id', field_id))) 

214 field_value = getattr(field, 'value', '') 

215 field_type = getattr(field, 'type', getattr(field, 'field_type', 'unknown')) 

216 

217 if debug: 

218 click.echo(f"Debug: Processing field '{field_label}' = '{str(field_value)[:20]}...'") 

219 if not fields or field_label in fields: 

220 env_vars[field_label] = str(field_value) 

221 

222 click.echo(f"Found {len(matching_items)} matching item(s)") 

223 except Exception as e: 

224 click.echo(f"Error: Failed to fetch items from vault - {str(e)}") 

225 if debug: 

226 import traceback 

227 click.echo(f"Debug: Full traceback:\n{traceback.format_exc()}") 

228 return 

229 

230 if not env_vars: 

231 click.echo("Warning: No environment variables found to import") 

232 return 

233 

234 if debug: 

235 click.echo(f"Debug: Collected {len(env_vars)} environment variables") 

236 

237 # Write to file 

238 output_file = file 

239 

240 # Check if file exists and prompt for confirmation 

241 if os.path.exists(output_file): 

242 if debug: 

243 click.echo(f"Debug: File '{output_file}' already exists") 

244 

245 # Prompt user for confirmation 

246 overwrite = click.confirm( 

247 f"File '{output_file}' already exists. Do you want to overwrite it?", 

248 default=False 

249 ) 

250 

251 if not overwrite: 

252 click.echo("Operation cancelled. File not modified.") 

253 return 

254 

255 if debug: 

256 click.echo(f"Debug: User confirmed overwrite of '{output_file}'") 

257 

258 try: 

259 if debug: 

260 click.echo(f"Debug: Writing to file '{output_file}'") 

261 with open(output_file, 'w') as f: 

262 f.write('# Generated from 1Password\n') 

263 for key, value in env_vars.items(): 

264 f.write(f"{key}={value}\n") 

265 click.echo(f"Successfully imported {len(env_vars)} environment variables to '{output_file}'") 

266 except PermissionError: 

267 click.echo(f"Error: Permission denied writing to '{output_file}'") 

268 except Exception as e: 

269 click.echo(f"Error: Failed to write file '{output_file}' - {str(e)}") 

270 

271 except Exception as e: 

272 click.echo(f"Unexpected error: {str(e)}") 

273 if debug: 

274 import traceback 

275 click.echo(f"Debug: Full traceback:\n{traceback.format_exc()}") 

276 

277 try: 

278 asyncio.run(run()) 

279 except KeyboardInterrupt: 

280 click.echo("\nOperation cancelled by user") 

281 except Exception as e: 

282 click.echo(f"Fatal error: {str(e)}") 

283 

284@cli.command() 

285@click.option('--vault', default='tokens', help='1Password vault name (default: tokens)') 

286@click.option('--file', default='.env', help='Path to input .env file') 

287@click.option('--name', default=None, help='Entry name for vault (defaults to project folder name)') 

288@click.option('--fields', multiple=True, help='Fields to export (e.g., OPEN_KEY,API_KEY)') 

289@click.option('--debug', is_flag=True, help='Enable debug output') 

290def export(vault, file, name, fields, debug): 

291 """Export env vars from .env file to 1Password vault.""" 

292 # Input validation 

293 valid, error = validate_fields(fields) 

294 if not valid: 

295 click.echo(f"Validation Error: {error}") 

296 return 

297 

298 if not os.path.exists(file): 

299 click.echo(f"Validation Error: Input file '{file}' not found") 

300 return 

301 

302 project_name = os.path.basename(os.getcwd()) 

303 entry_name = name or project_name 

304 click.echo(f"Exporting to vault '{vault}' for entry '{entry_name}'...") 

305 

306 if debug: 

307 click.echo(f"Debug: Using vault='{vault}', file='{file}', name='{name}', fields={fields}") 

308 

309 async def run(): 

310 try: 

311 client = await get_client() 

312 if debug: 

313 click.echo("Debug: Successfully authenticated with 1Password") 

314 except Exception: 

315 # Error already handled and displayed in get_client() 

316 return 

317 

318 try: 

319 # Validate input file exists 

320 if not os.path.exists(file): 

321 click.echo(f"Error: Input file '{file}' not found") 

322 return 

323 

324 # Read .env file 

325 try: 

326 env_vars = dotenv_values(file) 

327 if not env_vars: 

328 click.echo(f"Warning: No environment variables found in '{file}'") 

329 return 

330 except Exception as e: 

331 click.echo(f"Error: Failed to read file '{file}' - {str(e)}") 

332 return 

333 

334 # Filter fields if specified 

335 if fields: 

336 original_count = len(env_vars) 

337 env_vars = {k: v for k, v in env_vars.items() if k in fields} 

338 missing_fields = set(fields) - set(env_vars.keys()) 

339 if missing_fields: 

340 click.echo(f"Warning: Fields not found in file: {', '.join(missing_fields)}") 

341 click.echo(f"Filtered {original_count} variables down to {len(env_vars)} specified fields") 

342 

343 if not env_vars: 

344 click.echo("Error: No environment variables to export") 

345 return 

346 

347 # Get vault by name 

348 try: 

349 vaults = await client.vaults.list() 

350 vault_obj = next((v for v in vaults if v.title == vault), None) 

351 if not vault_obj: 

352 available_vaults = [v.title for v in vaults] 

353 click.echo(f"Error: Vault '{vault}' not found. Available vaults: {', '.join(available_vaults)}") 

354 return 

355 except Exception as e: 

356 click.echo(f"Error: Failed to list vaults - {str(e)}") 

357 return 

358 

359 # Create or update item in vault 

360 try: 

361 fields_list = [{"label": k, "value": v} for k, v in env_vars.items()] 

362 await client.items.create( 

363 vault_id=vault_obj.id, 

364 category="LOGIN", # Using LOGIN as a generic category 

365 title=entry_name, 

366 fields=fields_list 

367 ) 

368 click.echo(f"Successfully exported {len(env_vars)} environment variables to vault '{vault}' under entry '{entry_name}'") 

369 except Exception as e: 

370 click.echo(f"Error: Failed to create item in vault - {str(e)}") 

371 

372 except Exception as e: 

373 click.echo(f"Unexpected error: {str(e)}") 

374 

375 try: 

376 asyncio.run(run()) 

377 except KeyboardInterrupt: 

378 click.echo("\nOperation cancelled by user") 

379 except Exception as e: 

380 click.echo(f"Fatal error: {str(e)}") 

381 

382if __name__ == '__main__': 

383 cli()