Coverage for src/onepass_env/onepassword.py: 0%

93 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-18 11:04 -0300

1"""1Password SDK integration.""" 

2 

3import asyncio 

4import os 

5from typing import Any, Dict, List, Optional 

6 

7from onepassword.client import Client 

8 

9from onepass_env.exceptions import OnePassEnvError, AuthenticationError, VaultError 

10 

11 

12class OnePasswordClient: 

13 """Client for interacting with 1Password using the official SDK.""" 

14 

15 def __init__(self, vault: Optional[str] = None, verbose: bool = False): 

16 """Initialize the 1Password client. 

17  

18 Args: 

19 vault: 1Password vault name or ID 

20 verbose: Enable verbose logging 

21 """ 

22 self.vault = vault 

23 self.verbose = verbose 

24 self._client: Optional[Client] = None 

25 self._vault_id: Optional[str] = None 

26 

27 async def _get_client(self) -> Client: 

28 """Get or create the 1Password client.""" 

29 if self._client is None: 

30 try: 

31 token = os.getenv("OP_SERVICE_ACCOUNT_TOKEN") 

32 if not token: 

33 raise AuthenticationError("OP_SERVICE_ACCOUNT_TOKEN environment variable is not set") 

34 

35 # Authenticate with 1Password using the async SDK 

36 self._client = await Client.authenticate( 

37 auth=token, 

38 integration_name="1pass-env CLI", 

39 integration_version="v0.1.0" 

40 ) 

41 except Exception as e: 

42 raise AuthenticationError( 

43 f"Failed to initialize 1Password client. Make sure OP_SERVICE_ACCOUNT_TOKEN " 

44 f"environment variable is set: {e}" 

45 ) 

46 return self._client 

47 

48 async def _get_vault_id(self) -> str: 

49 """Get the vault ID from the vault name.""" 

50 if not self.vault: 

51 raise VaultError("Vault name is required") 

52 

53 if self._vault_id is None: 

54 try: 

55 client = await self._get_client() 

56 vaults = await client.vaults.list() 

57 

58 # Try to find vault by name or ID 

59 for vault in vaults: 

60 if vault.title == self.vault or vault.id == self.vault: 

61 self._vault_id = vault.id 

62 break 

63 

64 if self._vault_id is None: 

65 raise VaultError(f"Vault '{self.vault}' not found") 

66 

67 except Exception as e: 

68 if isinstance(e, VaultError): 

69 raise 

70 raise VaultError(f"Failed to access vault '{self.vault}': {e}") 

71 

72 return self._vault_id 

73 

74 def is_available(self) -> bool: 

75 """Check if 1Password SDK is available and properly configured.""" 

76 try: 

77 return os.getenv("OP_SERVICE_ACCOUNT_TOKEN") is not None 

78 except Exception: 

79 return False 

80 

81 def is_authenticated(self) -> bool: 

82 """Check if user is authenticated with 1Password.""" 

83 try: 

84 # Run the async check in a sync context 

85 return asyncio.run(self._check_authentication()) 

86 except Exception: 

87 return False 

88 

89 async def _check_authentication(self) -> bool: 

90 """Async helper to check authentication.""" 

91 try: 

92 client = await self._get_client() 

93 # Try to list vaults to test authentication 

94 vaults = await client.vaults.list() 

95 if self.verbose: 

96 print(f"Found {len(vaults)} vaults") 

97 return True 

98 except Exception as e: 

99 if self.verbose: 

100 print(f"Authentication failed: {e}") 

101 return False 

102 

103 def get_item_by_title(self, title: str): 

104 """Get an item by its title.""" 

105 try: 

106 return asyncio.run(self._get_item_by_title_async(title)) 

107 except Exception as e: 

108 raise OnePassEnvError(f"Failed to get item '{title}': {e}") 

109 

110 async def _get_item_by_title_async(self, title: str): 

111 """Async helper to get item by title.""" 

112 if not self.vault: 

113 raise VaultError("Vault name is required") 

114 

115 client = await self._get_client() 

116 vault_id = await self._get_vault_id() 

117 

118 # List all items in the vault 

119 items = await client.items.list(vault_id=vault_id) 

120 

121 # Find item by title 

122 for item_overview in items: 

123 if item_overview.title == title: 

124 # Get the full item details 

125 full_item = await client.items.get(vault_id=vault_id, item_id=item_overview.id) 

126 return full_item 

127 

128 return None 

129 

130 def list_items(self) -> List[Dict[str, Any]]: 

131 """List all items in the vault.""" 

132 try: 

133 return asyncio.run(self._list_items_async()) 

134 except Exception as e: 

135 raise OnePassEnvError(f"Failed to list items: {e}") 

136 

137 async def _list_items_async(self) -> List[Dict[str, Any]]: 

138 """Async helper to list items.""" 

139 client = await self._get_client() 

140 vault_id = await self._get_vault_id() 

141 

142 items = await client.items.list(vault_id=vault_id) 

143 

144 return [ 

145 { 

146 "id": item.id, 

147 "title": item.title, 

148 "category": item.category, 

149 "created_at": getattr(item, 'created_at', None), 

150 "updated_at": getattr(item, 'updated_at', None), 

151 } 

152 for item in items 

153 ] 

154 

155 

156# Sync wrapper functions to make the async API easier to use 

157def run_async(coro): 

158 """Helper to run async functions in sync context.""" 

159 try: 

160 loop = asyncio.get_event_loop() 

161 except RuntimeError: 

162 loop = asyncio.new_event_loop() 

163 asyncio.set_event_loop(loop) 

164 

165 return loop.run_until_complete(coro)