Package zeroinstall :: Package injector :: Module gpg
[frames] | no frames]

Source Code for Module zeroinstall.injector.gpg

  1  """ 
  2  Python interface to GnuPG. 
  3   
  4  This module is used to invoke GnuPG to check the digital signatures on interfaces. 
  5   
  6  @see: L{iface_cache.PendingFeed} 
  7  """ 
  8   
  9  # Copyright (C) 2009, Thomas Leonard 
 10  # See the README file for details, or visit http://0install.net. 
 11   
 12  from zeroinstall import _ 
 13  import subprocess 
 14  import base64, re 
 15  import os 
 16  import tempfile 
 17  from logging import info, warn 
 18   
 19  from zeroinstall.support import find_in_path, basedir 
 20  from zeroinstall.injector.trust import trust_db 
 21  from zeroinstall.injector.model import SafeException 
 22   
 23  _gnupg_options = None 
24 -def _run_gpg(args, **kwargs):
25 global _gnupg_options 26 if _gnupg_options is None: 27 gpg_path = find_in_path('gpg') or find_in_path('gpg2') or 'gpg' 28 _gnupg_options = [gpg_path, '--no-secmem-warning'] 29 30 if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ: 31 _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')] 32 info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options[-1]) 33 34 return subprocess.Popen(_gnupg_options + args, **kwargs)
35
36 -class Signature(object):
37 """Abstract base class for signature check results. 38 @ivar status: the raw data returned by GPG 39 @ivar messages: any messages printed by GPG which may be relevant to this signature 40 """ 41 status = None 42 messages = None 43
44 - def __init__(self, status):
45 self.status = status
46
47 - def is_trusted(self, domain = None):
48 """Whether this signature is trusted by the user.""" 49 return False
50
51 - def need_key(self):
52 """Returns the ID of the key that must be downloaded to check this signature.""" 53 return None
54
55 -class ValidSig(Signature):
56 """A valid signature check result.""" 57 FINGERPRINT = 0 58 TIMESTAMP = 2 59
60 - def __str__(self):
61 return "Valid signature from " + self.status[self.FINGERPRINT]
62
63 - def is_trusted(self, domain = None):
64 """Asks the L{trust.trust_db}.""" 65 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
66
67 - def get_timestamp(self):
68 """Get the time this signature was made.""" 69 return int(self.status[self.TIMESTAMP])
70 71 fingerprint = property(lambda self: self.status[self.FINGERPRINT]) 72
73 - def get_details(self):
74 """Call 'gpg --list-keys' and return the results split into lines and columns. 75 @rtype: [[str]]""" 76 # Note: GnuPG 2 always uses --fixed-list-mode 77 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE) 78 cout, unused = child.communicate() 79 if child.returncode: 80 info(_("GPG exited with code %d") % child.returncode) 81 details = [] 82 for line in cout.split('\n'): 83 details.append(line.split(':')) 84 return details
85
86 -class BadSig(Signature):
87 """A bad signature (doesn't match the message).""" 88 KEYID = 0 89
90 - def __str__(self):
91 return _("BAD signature by %s (the message has been tampered with)") \ 92 % self.status[self.KEYID]
93
94 -class ErrSig(Signature):
95 """Error while checking a signature.""" 96 KEYID = 0 97 ALG = 1 98 RC = -1 99
100 - def __str__(self):
101 msg = _("ERROR signature by %s: ") % self.status[self.KEYID] 102 rc = int(self.status[self.RC]) 103 if rc == 4: 104 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG] 105 elif rc == 9: 106 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID] 107 else: 108 msg += _("Unknown reason code %d") % rc 109 return msg
110
111 - def need_key(self):
112 rc = int(self.status[self.RC]) 113 if rc == 9: 114 return self.status[self.KEYID] 115 return None
116
117 -class Key:
118 """A GPG key. 119 @since: 0.27 120 @param fingerprint: the fingerprint of the key 121 @type fingerprint: str 122 @ivar name: a short name for the key, extracted from the full name 123 @type name: str 124 """
125 - def __init__(self, fingerprint):
126 self.fingerprint = fingerprint 127 self.name = '(unknown)'
128
129 - def get_short_name(self):
130 return self.name.split(' (', 1)[0].split(' <', 1)[0]
131
132 -def load_keys(fingerprints):
133 """Load a set of keys at once. 134 This is much more efficient than making individual calls to L{load_key}. 135 @return: a list of loaded keys, indexed by fingerprint 136 @rtype: {str: L{Key}} 137 @since: 0.27""" 138 import codecs 139 140 keys = {} 141 142 # Otherwise GnuPG returns everything... 143 if not fingerprints: return keys 144 145 for fp in fingerprints: 146 keys[fp] = Key(fp) 147 148 current_fpr = None 149 current_uid = None 150 151 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', 152 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE) 153 try: 154 for line in child.stdout: 155 if line.startswith('pub:'): 156 current_fpr = None 157 current_uid = None 158 if line.startswith('fpr:'): 159 current_fpr = line.split(':')[9] 160 if current_fpr in keys and current_uid: 161 # This is probably a subordinate key, where the fingerprint 162 # comes after the uid, not before. Note: we assume the subkey is 163 # cross-certified, as recent always ones are. 164 try: 165 keys[current_fpr].name = codecs.decode(current_uid, 'utf-8') 166 except: 167 warn("Not UTF-8: %s", current_uid) 168 keys[current_fpr].name = current_uid 169 if line.startswith('uid:'): 170 assert current_fpr is not None 171 # Only take primary UID 172 if current_uid: continue 173 parts = line.split(':') 174 current_uid = parts[9] 175 if current_fpr in keys: 176 keys[current_fpr].name = current_uid 177 finally: 178 if child.wait(): 179 warn(_("gpg --list-keys failed with exit code %d") % child.returncode) 180 181 return keys
182
183 -def load_key(fingerprint):
184 """Query gpg for information about this key. 185 @return: a new key 186 @rtype: L{Key} 187 @since: 0.27""" 188 return load_keys([fingerprint])[fingerprint]
189
190 -def import_key(stream):
191 """Run C{gpg --import} with this stream as stdin.""" 192 errors = tempfile.TemporaryFile() 193 194 child = _run_gpg(['--quiet', '--import', '--batch'], 195 stdin = stream, stderr = errors) 196 197 status = child.wait() 198 199 errors.seek(0) 200 error_messages = errors.read().strip() 201 errors.close() 202 203 if error_messages: 204 import codecs 205 decoder = codecs.lookup('utf-8') 206 error_messages = decoder.decode(error_messages, errors = 'replace')[0] 207 208 if status != 0: 209 if error_messages: 210 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages) 211 else: 212 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status) 213 elif error_messages: 214 warn(_("Warnings from 'gpg --import':\n%s") % error_messages)
215
216 -def _check_xml_stream(stream):
217 xml_comment_start = '<!-- Base64 Signature' 218 219 data_to_check = stream.read() 220 221 last_comment = data_to_check.rfind('\n' + xml_comment_start) 222 if last_comment < 0: 223 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?")) 224 last_comment += 1 # Include new-line in data 225 226 data = tempfile.TemporaryFile() 227 data.write(data_to_check[:last_comment]) 228 data.flush() 229 os.lseek(data.fileno(), 0, 0) 230 231 errors = tempfile.TemporaryFile() 232 233 sig_lines = data_to_check[last_comment:].split('\n') 234 if sig_lines[0].strip() != xml_comment_start: 235 raise SafeException(_('Bad signature block: extra data on comment line')) 236 while sig_lines and not sig_lines[-1].strip(): 237 del sig_lines[-1] 238 if sig_lines[-1].strip() != '-->': 239 raise SafeException(_('Bad signature block: last line is not end-of-comment')) 240 sig_data = '\n'.join(sig_lines[1:-1]) 241 242 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None: 243 raise SafeException(_("Invalid characters found in base 64 encoded signature")) 244 try: 245 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4) 246 except Exception as ex: 247 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex)) 248 249 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-') 250 try: 251 sig_file = os.fdopen(sig_fd, 'w') 252 sig_file.write(sig_data) 253 sig_file.close() 254 255 # Note: Should ideally close status_r in the child, but we want to support Windows too 256 child = _run_gpg([# Not all versions support this: 257 #'--max-output', str(1024 * 1024), 258 '--batch', 259 # Windows GPG can only cope with "1" here 260 '--status-fd', '1', 261 # Don't try to download missing keys; we'll do that 262 '--keyserver-options', 'no-auto-key-retrieve', 263 '--verify', sig_name, '-'], 264 stdin = data, 265 stdout = subprocess.PIPE, 266 stderr = errors) 267 268 try: 269 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors) 270 finally: 271 os.lseek(stream.fileno(), 0, 0) 272 stream.seek(0) 273 finally: 274 os.unlink(sig_name) 275 return (stream, sigs)
276
277 -def check_stream(stream):
278 """Pass stream through gpg --decrypt to get the data, the error text, 279 and a list of signatures (good or bad). If stream starts with "<?xml " 280 then get the signature from a comment at the end instead (and the returned 281 data is the original stream). stream must be seekable. 282 @note: Stream returned may or may not be the one passed in. Be careful! 283 @return: (data_stream, [Signatures])""" 284 285 stream.seek(0) 286 287 start = stream.read(6) 288 stream.seek(0) 289 if start == "<?xml ": 290 return _check_xml_stream(stream) 291 elif start == '-----B': 292 raise SafeException(_("Plain GPG-signed feeds no longer supported")) 293 else: 294 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
295
296 -def _get_sigs_from_gpg_status_stream(status_r, child, errors):
297 """Read messages from status_r and collect signatures from it. 298 When done, reap 'child'. 299 If there are no signatures, throw SafeException (using errors 300 for the error message if non-empty).""" 301 sigs = [] 302 303 # Should we error out on bad signatures, even if there's a good 304 # signature too? 305 306 for line in status_r: 307 assert line.endswith('\n') 308 if not line.startswith('[GNUPG:] '): 309 # The docs says every line starts with this, but if auto-key-retrieve 310 # is on then they might not. See bug #3420548 311 warn("Invalid output from GnuPG: %r", line) 312 continue 313 314 line = line[9:-1] 315 split_line = line.split(' ') 316 code = split_line[0] 317 args = split_line[1:] 318 if code == 'VALIDSIG': 319 sigs.append(ValidSig(args)) 320 elif code == 'BADSIG': 321 sigs.append(BadSig(args)) 322 elif code == 'ERRSIG': 323 sigs.append(ErrSig(args)) 324 325 child.wait() # (ignore exit status) 326 327 errors.seek(0) 328 329 error_messages = errors.read().strip() 330 errors.close() 331 332 if not sigs: 333 if error_messages: 334 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages) 335 else: 336 raise SafeException(_("No signatures found. No error messages from GPG.")) 337 elif error_messages: 338 # Attach the warnings to all the signatures, in case they're useful. 339 for s in sigs: 340 s.messages = error_messages 341 342 return sigs
343