Package zeroinstall :: Package injector :: Module trust
[frames] | no frames]

Source Code for Module zeroinstall.injector.trust

  1  """ 
  2  Records who we trust to sign feeds. 
  3   
  4  Trust is divided up into domains, so that it is possible to trust a key 
  5  in some cases and not others. 
  6   
  7  @var trust_db: Singleton trust database instance. 
  8  """ 
  9   
 10  # Copyright (C) 2009, Thomas Leonard 
 11  # See the README file for details, or visit http://0install.net. 
 12   
 13  from zeroinstall import _, SafeException 
 14  import os 
 15  from logging import info 
 16   
 17  from zeroinstall.support import basedir, tasks 
 18  from .namespaces import config_site, config_prog, XMLNS_TRUST 
 19   
 20  KEY_INFO_TIMEOUT = 10   # Maximum time to wait for response from key-info-server 
21 22 -class TrustDB(object):
23 """A database of trusted keys. 24 @ivar keys: maps trusted key fingerprints to a set of domains for which where it is trusted 25 @type keys: {str: set(str)} 26 @ivar watchers: callbacks invoked by L{notify} 27 @see: L{trust_db} - the singleton instance of this class""" 28 __slots__ = ['keys', 'watchers'] 29
30 - def __init__(self):
31 self.keys = None 32 self.watchers = []
33
34 - def is_trusted(self, fingerprint, domain = None):
35 self.ensure_uptodate() 36 37 domains = self.keys.get(fingerprint, None) 38 if not domains: return False # Unknown key 39 40 if domain is None: 41 return True # Deprecated 42 43 return domain in domains or '*' in domains
44
45 - def get_trust_domains(self, fingerprint):
46 """Return the set of domains in which this key is trusted. 47 If the list includes '*' then the key is trusted everywhere. 48 @since: 0.27 49 """ 50 self.ensure_uptodate() 51 return self.keys.get(fingerprint, set())
52
53 - def get_keys_for_domain(self, domain):
54 """Return the set of keys trusted for this domain. 55 @since: 0.27""" 56 self.ensure_uptodate() 57 return set([fp for fp in self.keys 58 if domain in self.keys[fp]])
59
60 - def trust_key(self, fingerprint, domain = '*'):
61 """Add key to the list of trusted fingerprints. 62 @param fingerprint: base 16 fingerprint without any spaces 63 @type fingerprint: str 64 @param domain: domain in which key is to be trusted 65 @type domain: str 66 @note: call L{notify} after trusting one or more new keys""" 67 if self.is_trusted(fingerprint, domain): return 68 69 int(fingerprint, 16) # Ensure fingerprint is valid 70 71 if fingerprint not in self.keys: 72 self.keys[fingerprint] = set() 73 74 #if domain == '*': 75 # warn("Calling trust_key() without a domain is deprecated") 76 77 self.keys[fingerprint].add(domain) 78 self.save()
79
80 - def untrust_key(self, key, domain = '*'):
81 self.ensure_uptodate() 82 self.keys[key].remove(domain) 83 84 if not self.keys[key]: 85 # No more domains for this key 86 del self.keys[key] 87 88 self.save()
89
90 - def save(self):
91 from xml.dom import minidom 92 import tempfile 93 94 doc = minidom.Document() 95 root = doc.createElementNS(XMLNS_TRUST, 'trusted-keys') 96 root.setAttribute('xmlns', XMLNS_TRUST) 97 doc.appendChild(root) 98 99 for fingerprint in self.keys: 100 keyelem = doc.createElementNS(XMLNS_TRUST, 'key') 101 root.appendChild(keyelem) 102 keyelem.setAttribute('fingerprint', fingerprint) 103 for domain in self.keys[fingerprint]: 104 domainelem = doc.createElementNS(XMLNS_TRUST, 'domain') 105 domainelem.setAttribute('value', domain) 106 keyelem.appendChild(domainelem) 107 108 d = basedir.save_config_path(config_site, config_prog) 109 fd, tmpname = tempfile.mkstemp(dir = d, prefix = 'trust-') 110 tmp = os.fdopen(fd, 'wb') 111 doc.writexml(tmp, indent = "", addindent = " ", newl = "\n") 112 tmp.close() 113 114 os.rename(tmpname, os.path.join(d, 'trustdb.xml'))
115
116 - def notify(self):
117 """Call all watcher callbacks. 118 This should be called after trusting or untrusting one or more new keys. 119 @since: 0.25""" 120 for w in self.watchers: w()
121
122 - def ensure_uptodate(self):
123 from xml.dom import minidom 124 125 # This is a bit inefficient... (could cache things) 126 self.keys = {} 127 128 trust = basedir.load_first_config(config_site, config_prog, 'trustdb.xml') 129 if trust: 130 keys = minidom.parse(trust).documentElement 131 for key in keys.getElementsByTagNameNS(XMLNS_TRUST, 'key'): 132 domains = set() 133 self.keys[key.getAttribute('fingerprint')] = domains 134 for domain in key.getElementsByTagNameNS(XMLNS_TRUST, 'domain'): 135 domains.add(domain.getAttribute('value')) 136 else: 137 # Convert old database to XML format 138 trust = basedir.load_first_config(config_site, config_prog, 'trust') 139 if trust: 140 #print "Loading trust from", trust_db 141 for key in open(trust).read().split('\n'): 142 if key: 143 self.keys[key] = set(['*']) 144 else: 145 # No trust database found. 146 # Trust Thomas Leonard's key for 0install.net by default. 147 # Avoids distracting confirmation box on first run when we check 148 # for updates to the GUI. 149 self.keys['92429807C9853C0744A68B9AAE07828059A53CC1'] = set(['0install.net'])
150
151 -def domain_from_url(url):
152 """Extract the trust domain for a URL. 153 @param url: the feed's URL 154 @type url: str 155 @return: the trust domain 156 @rtype: str 157 @since: 0.27 158 @raise SafeException: the URL can't be parsed""" 159 import urlparse 160 if os.path.isabs(url): 161 raise SafeException(_("Can't get domain from a local path: '%s'") % url) 162 domain = urlparse.urlparse(url)[1] 163 if domain and domain != '*': 164 return domain 165 raise SafeException(_("Can't extract domain from URL '%s'") % url)
166 167 trust_db = TrustDB()
168 169 -class TrustMgr(object):
170 """A TrustMgr handles the process of deciding whether to trust new keys 171 (contacting the key information server, prompting the user, accepting automatically, etc) 172 @since: 0.53""" 173 174 __slots__ = ['config', '_current_confirm'] 175
176 - def __init__(self, config):
177 self.config = config 178 self._current_confirm = None # (a lock to prevent asking the user multiple questions at once)
179 180 @tasks.async
181 - def confirm_keys(self, pending):
182 """We don't trust any of the signatures yet. Collect information about them and add the keys to the 183 trusted list, possibly after confirming with the user (via config.handler). 184 Updates the L{trust} database, and then calls L{trust.TrustDB.notify}. 185 @since: 0.53 186 @arg pending: an object holding details of the updated feed 187 @type pending: L{PendingFeed} 188 @return: A blocker that triggers when the user has chosen, or None if already done. 189 @rtype: None | L{Blocker}""" 190 191 assert pending.sigs 192 193 from zeroinstall.injector import gpg 194 valid_sigs = [s for s in pending.sigs if isinstance(s, gpg.ValidSig)] 195 if not valid_sigs: 196 def format_sig(sig): 197 msg = str(sig) 198 if sig.messages: 199 msg += "\nMessages from GPG:\n" + sig.messages 200 return msg
201 raise SafeException(_('No valid signatures found on "%(url)s". Signatures:%(signatures)s') % 202 {'url': pending.url, 'signatures': ''.join(['\n- ' + format_sig(s) for s in pending.sigs])}) 203 204 # Start downloading information about the keys... 205 fetcher = self.config.fetcher 206 kfs = {} 207 for sig in valid_sigs: 208 kfs[sig] = fetcher.fetch_key_info(sig.fingerprint) 209 210 # Wait up to KEY_INFO_TIMEOUT seconds for key information to arrive. Avoids having the dialog 211 # box update while the user is looking at it, and may allow it to be skipped completely in some 212 # cases. 213 timeout = tasks.TimeoutBlocker(KEY_INFO_TIMEOUT, "key info timeout") 214 while True: 215 key_info_blockers = [sig_info.blocker for sig_info in kfs.values() if sig_info.blocker is not None] 216 if not key_info_blockers: 217 break 218 info("Waiting for response from key-info server: %s", key_info_blockers) 219 yield [timeout] + key_info_blockers 220 if timeout.happened: 221 info("Timeout waiting for key info response") 222 break 223 224 # If we're already confirming something else, wait for that to finish... 225 while self._current_confirm is not None: 226 info("Waiting for previous key confirmations to finish") 227 yield self._current_confirm 228 229 domain = domain_from_url(pending.url) 230 231 if self.config.auto_approve_keys: 232 existing_feed = self.config.iface_cache.get_feed(pending.url) 233 if not existing_feed: 234 changes = False 235 for sig, kf in kfs.iteritems(): 236 for key_info in kf.info: 237 if key_info.getAttribute("vote") == "good": 238 info(_("Automatically approving key for new feed %s based on response from key info server"), pending.url) 239 trust_db.trust_key(sig.fingerprint, domain) 240 changes = True 241 if changes: 242 trust_db.notify() 243 244 # Check whether we still need to confirm. The user may have 245 # already approved one of the keys while dealing with another 246 # feed, or we may have just auto-approved it. 247 for sig in kfs: 248 is_trusted = trust_db.is_trusted(sig.fingerprint, domain) 249 if is_trusted: 250 return 251 252 # Take the lock and confirm this feed 253 self._current_confirm = lock = tasks.Blocker('confirm key lock') 254 try: 255 done = self.config.handler.confirm_import_feed(pending, kfs) 256 if done is not None: 257 yield done 258 tasks.check(done) 259 finally: 260 self._current_confirm = None 261 lock.trigger()
262