1 """
2 Python interface to GnuPG.
3
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
5
6 @see: L{iface_cache.PendingFeed}
7 """
8
9
10
11
12 from zeroinstall import _
13 import subprocess
14 import base64, re
15 import os
16 import tempfile
17 from logging import info, warn
18
19 from zeroinstall.support import find_in_path, basedir
20 from zeroinstall.injector.trust import trust_db
21 from zeroinstall.injector.model import SafeException
22
23 _gnupg_options = None
35
37 """Abstract base class for signature check results.
38 @ivar status: the raw data returned by GPG
39 @ivar messages: any messages printed by GPG which may be relevant to this signature
40 """
41 status = None
42 messages = None
43
46
48 """Whether this signature is trusted by the user."""
49 return False
50
52 """Returns the ID of the key that must be downloaded to check this signature."""
53 return None
54
56 """A valid signature check result."""
57 FINGERPRINT = 0
58 TIMESTAMP = 2
59
62
66
68 """Get the time this signature was made."""
69 return int(self.status[self.TIMESTAMP])
70
71 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
72
74 """Call 'gpg --list-keys' and return the results split into lines and columns.
75 @rtype: [[str]]"""
76
77 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
78 cout, unused = child.communicate()
79 if child.returncode:
80 info(_("GPG exited with code %d") % child.returncode)
81 details = []
82 for line in cout.split('\n'):
83 details.append(line.split(':'))
84 return details
85
87 """A bad signature (doesn't match the message)."""
88 KEYID = 0
89
91 return _("BAD signature by %s (the message has been tampered with)") \
92 % self.status[self.KEYID]
93
95 """Error while checking a signature."""
96 KEYID = 0
97 ALG = 1
98 RC = -1
99
101 msg = _("ERROR signature by %s: ") % self.status[self.KEYID]
102 rc = int(self.status[self.RC])
103 if rc == 4:
104 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG]
105 elif rc == 9:
106 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID]
107 else:
108 msg += _("Unknown reason code %d") % rc
109 return msg
110
112 rc = int(self.status[self.RC])
113 if rc == 9:
114 return self.status[self.KEYID]
115 return None
116
118 """A GPG key.
119 @since: 0.27
120 @param fingerprint: the fingerprint of the key
121 @type fingerprint: str
122 @ivar name: a short name for the key, extracted from the full name
123 @type name: str
124 """
128
130 return self.name.split(' (', 1)[0].split(' <', 1)[0]
131
133 """Load a set of keys at once.
134 This is much more efficient than making individual calls to L{load_key}.
135 @return: a list of loaded keys, indexed by fingerprint
136 @rtype: {str: L{Key}}
137 @since: 0.27"""
138 import codecs
139
140 keys = {}
141
142
143 if not fingerprints: return keys
144
145 for fp in fingerprints:
146 keys[fp] = Key(fp)
147
148 current_fpr = None
149 current_uid = None
150
151 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys',
152 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE)
153 try:
154 for line in child.stdout:
155 if line.startswith('pub:'):
156 current_fpr = None
157 current_uid = None
158 if line.startswith('fpr:'):
159 current_fpr = line.split(':')[9]
160 if current_fpr in keys and current_uid:
161
162
163
164 try:
165 keys[current_fpr].name = codecs.decode(current_uid, 'utf-8')
166 except:
167 warn("Not UTF-8: %s", current_uid)
168 keys[current_fpr].name = current_uid
169 if line.startswith('uid:'):
170 assert current_fpr is not None
171
172 if current_uid: continue
173 parts = line.split(':')
174 current_uid = parts[9]
175 if current_fpr in keys:
176 keys[current_fpr].name = current_uid
177 finally:
178 if child.wait():
179 warn(_("gpg --list-keys failed with exit code %d") % child.returncode)
180
181 return keys
182
184 """Query gpg for information about this key.
185 @return: a new key
186 @rtype: L{Key}
187 @since: 0.27"""
188 return load_keys([fingerprint])[fingerprint]
189
191 """Run C{gpg --import} with this stream as stdin."""
192 errors = tempfile.TemporaryFile()
193
194 child = _run_gpg(['--quiet', '--import', '--batch'],
195 stdin = stream, stderr = errors)
196
197 status = child.wait()
198
199 errors.seek(0)
200 error_messages = errors.read().strip()
201 errors.close()
202
203 if error_messages:
204 import codecs
205 decoder = codecs.lookup('utf-8')
206 error_messages = decoder.decode(error_messages, errors = 'replace')[0]
207
208 if status != 0:
209 if error_messages:
210 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages)
211 else:
212 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status)
213 elif error_messages:
214 warn(_("Warnings from 'gpg --import':\n%s") % error_messages)
215
217 xml_comment_start = '<!-- Base64 Signature'
218
219 data_to_check = stream.read()
220
221 last_comment = data_to_check.rfind('\n' + xml_comment_start)
222 if last_comment < 0:
223 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
224 last_comment += 1
225
226 data = tempfile.TemporaryFile()
227 data.write(data_to_check[:last_comment])
228 data.flush()
229 os.lseek(data.fileno(), 0, 0)
230
231 errors = tempfile.TemporaryFile()
232
233 sig_lines = data_to_check[last_comment:].split('\n')
234 if sig_lines[0].strip() != xml_comment_start:
235 raise SafeException(_('Bad signature block: extra data on comment line'))
236 while sig_lines and not sig_lines[-1].strip():
237 del sig_lines[-1]
238 if sig_lines[-1].strip() != '-->':
239 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
240 sig_data = '\n'.join(sig_lines[1:-1])
241
242 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
243 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
244 try:
245 sig_data = base64.decodestring(sig_data)
246 except Exception as ex:
247 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex))
248
249 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
250 try:
251 sig_file = os.fdopen(sig_fd, 'w')
252 sig_file.write(sig_data)
253 sig_file.close()
254
255
256 child = _run_gpg([
257
258 '--batch',
259
260 '--status-fd', '1',
261
262 '--keyserver-options', 'no-auto-key-retrieve',
263 '--verify', sig_name, '-'],
264 stdin = data,
265 stdout = subprocess.PIPE,
266 stderr = errors)
267
268 try:
269 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
270 finally:
271 os.lseek(stream.fileno(), 0, 0)
272 stream.seek(0)
273 finally:
274 os.unlink(sig_name)
275 return (stream, sigs)
276
278 """Pass stream through gpg --decrypt to get the data, the error text,
279 and a list of signatures (good or bad). If stream starts with "<?xml "
280 then get the signature from a comment at the end instead (and the returned
281 data is the original stream). stream must be seekable.
282 @note: Stream returned may or may not be the one passed in. Be careful!
283 @return: (data_stream, [Signatures])"""
284
285 stream.seek(0)
286
287 start = stream.read(6)
288 stream.seek(0)
289 if start == "<?xml ":
290 return _check_xml_stream(stream)
291 elif start == '-----B':
292 raise SafeException(_("Plain GPG-signed feeds no longer supported"))
293 else:
294 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
295
297 """Read messages from status_r and collect signatures from it.
298 When done, reap 'child'.
299 If there are no signatures, throw SafeException (using errors
300 for the error message if non-empty)."""
301 sigs = []
302
303
304
305
306 for line in status_r:
307 assert line.endswith('\n')
308 if not line.startswith('[GNUPG:] '):
309
310
311 warn("Invalid output from GnuPG: %r", line)
312 continue
313
314 line = line[9:-1]
315 split_line = line.split(' ')
316 code = split_line[0]
317 args = split_line[1:]
318 if code == 'VALIDSIG':
319 sigs.append(ValidSig(args))
320 elif code == 'BADSIG':
321 sigs.append(BadSig(args))
322 elif code == 'ERRSIG':
323 sigs.append(ErrSig(args))
324
325 child.wait()
326
327 errors.seek(0)
328
329 error_messages = errors.read().strip()
330 errors.close()
331
332 if not sigs:
333 if error_messages:
334 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages)
335 else:
336 raise SafeException(_("No signatures found. No error messages from GPG."))
337 elif error_messages:
338
339 for s in sigs:
340 s.messages = error_messages
341
342 return sigs
343