| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Miscellaneous essential routines. |
| |
| This includes actual message transmission routines, address checking and |
| message and address munging, a handy-dandy routine to map a function on all |
| the mailing lists, and whatever else doesn't belong elsewhere. |
| |
| """ |
| |
| from __future__ import nested_scopes |
| |
| import os |
| import re |
| import cgi |
| import sha |
| import time |
| import errno |
| import base64 |
| import random |
| import urlparse |
| import htmlentitydefs |
| import email.Header |
| import email.Iterators |
| from email.Errors import HeaderParseError |
| from types import UnicodeType |
| from string import whitespace, digits |
| try: |
| |
| from string import ascii_letters |
| except ImportError: |
| |
| _lower = 'abcdefghijklmnopqrstuvwxyz' |
| ascii_letters = _lower + _lower.upper() |
| |
| from Mailman import mm_cfg |
| from Mailman import Errors |
| from Mailman import Site |
| from Mailman.SafeDict import SafeDict |
| from Mailman.Logging.Syslog import syslog |
| |
| try: |
| True, False |
| except NameError: |
| True = 1 |
| False = 0 |
| |
| EMPTYSTRING = '' |
| UEMPTYSTRING = u'' |
| NL = '\n' |
| DOT = '.' |
| IDENTCHARS = ascii_letters + digits + '_' |
| |
| |
| |
| cre = re.compile(r'%\(([_a-z]\w*?)\)s?', re.IGNORECASE) |
| |
| dre = re.compile(r'(\${2})|\$([_a-z]\w*)|\${([_a-z]\w*)}', re.IGNORECASE) |
| |
| |
| |
| def list_exists(listname): |
| """Return true iff list `listname' exists.""" |
| |
| |
| |
| |
| |
| basepath = Site.get_listpath(listname) |
| for ext in ('.pck', '.pck.last', '.db', '.db.last'): |
| dbfile = os.path.join(basepath, 'config' + ext) |
| if os.path.exists(dbfile): |
| return True |
| return False |
| |
| |
| def list_names(): |
| """Return the names of all lists in default list directory.""" |
| |
| return Site.get_listnames() |
| |
| |
| |
| |
| def wrap(text, column=70, honor_leading_ws=True): |
| """Wrap and fill the text to the specified column. |
| |
| Wrapping is always in effect, although if it is not possible to wrap a |
| line (because some word is longer than `column' characters) the line is |
| broken at the next available whitespace boundary. Paragraphs are also |
| always filled, unless honor_leading_ws is true and the line begins with |
| whitespace. This is the algorithm that the Python FAQ wizard uses, and |
| seems like a good compromise. |
| |
| """ |
| wrapped = '' |
| |
| paras = re.split('\n\n', text) |
| for para in paras: |
| |
| lines = [] |
| fillprev = False |
| for line in para.split(NL): |
| if not line: |
| lines.append(line) |
| continue |
| if honor_leading_ws and line[0] in whitespace: |
| fillthis = False |
| else: |
| fillthis = True |
| if fillprev and fillthis: |
| |
| |
| lines[-1] = lines[-1].rstrip() + ' ' + line |
| else: |
| |
| lines.append(line) |
| fillprev = fillthis |
| |
| for text in lines: |
| while text: |
| if len(text) <= column: |
| line = text |
| text = '' |
| else: |
| bol = column |
| |
| while bol > 0 and text[bol] not in whitespace: |
| bol -= 1 |
| |
| eol = bol |
| while eol > 0 and text[eol] in whitespace: |
| eol -= 1 |
| |
| if eol == 0: |
| |
| eol = column |
| while eol < len(text) and text[eol] not in whitespace: |
| eol += 1 |
| bol = eol |
| while bol < len(text) and text[bol] in whitespace: |
| bol += 1 |
| bol -= 1 |
| line = text[:eol+1] + '\n' |
| |
| bol += 1 |
| while bol < len(text) and text[bol] in whitespace: |
| bol += 1 |
| text = text[bol:] |
| wrapped += line |
| wrapped += '\n' |
| |
| wrapped += '\n' |
| |
| |
| return wrapped[:-2] |
| |
| |
| |
| def QuotePeriods(text): |
| JOINER = '\n .\n' |
| SEP = '\n.\n' |
| return JOINER.join(text.split(SEP)) |
| |
| |
| |
| def ParseEmail(email): |
| user = None |
| domain = None |
| email = email.lower() |
| at_sign = email.find('@') |
| if at_sign < 1: |
| return email, None |
| user = email[:at_sign] |
| rest = email[at_sign+1:] |
| domain = rest.split('.') |
| return user, domain |
| |
| |
| def LCDomain(addr): |
| "returns the address with the domain part lowercased" |
| atind = addr.find('@') |
| if atind == -1: |
| return addr |
| return addr[:atind] + '@' + addr[atind+1:].lower() |
| |
| |
| |
| _badchars = re.compile(r'[][()<>|;^,\000-\037\177-\377]') |
| |
| def ValidateEmail(s): |
| """Verify that an email address isn't grossly evil.""" |
| |
| if not s or s.count(' ') > 0: |
| raise Errors.MMBadEmailError |
| if _badchars.search(s) or s[0] == '-': |
| raise Errors.MMHostileAddress, s |
| user, domain_parts = ParseEmail(s) |
| |
| if not domain_parts: |
| raise Errors.MMBadEmailError, s |
| if len(domain_parts) < 2: |
| raise Errors.MMBadEmailError, s |
| |
| |
| |
| |
| |
| CRNLpat = re.compile(r'[^\x21-\x7e]') |
| |
| def GetPathPieces(envar='PATH_INFO'): |
| path = os.environ.get(envar) |
| if path: |
| if CRNLpat.search(path): |
| path = CRNLpat.split(path)[0] |
| syslog('error', 'Warning: Possible malformed path attack.') |
| return [p for p in path.split('/') if p] |
| return None |
| |
| |
| |
| def ScriptURL(target, web_page_url=None, absolute=False): |
| """target - scriptname only, nothing extra |
| web_page_url - the list's configvar of the same name |
| absolute - a flag which if set, generates an absolute url |
| """ |
| if web_page_url is None: |
| web_page_url = mm_cfg.DEFAULT_URL_PATTERN % get_domain() |
| if web_page_url[-1] <> '/': |
| web_page_url = web_page_url + '/' |
| fullpath = os.environ.get('REQUEST_URI') |
| if fullpath is None: |
| fullpath = os.environ.get('SCRIPT_NAME', '') + \ |
| os.environ.get('PATH_INFO', '') |
| baseurl = urlparse.urlparse(web_page_url)[2] |
| if not absolute and fullpath.endswith(baseurl): |
| |
| fullpath = fullpath[len(baseurl):] |
| i = fullpath.find('?') |
| if i > 0: |
| count = fullpath.count('/', 0, i) |
| else: |
| count = fullpath.count('/') |
| path = ('../' * count) + target |
| else: |
| path = web_page_url + target |
| return path + mm_cfg.CGIEXT |
| |
| |
| |
| def GetPossibleMatchingAddrs(name): |
| """returns a sorted list of addresses that could possibly match |
| a given name. |
| |
| For Example, given scott@pobox.com, return ['scott@pobox.com'], |
| given scott@blackbox.pobox.com return ['scott@blackbox.pobox.com', |
| 'scott@pobox.com']""" |
| |
| name = name.lower() |
| user, domain = ParseEmail(name) |
| res = [name] |
| if domain: |
| domain = domain[1:] |
| while len(domain) >= 2: |
| res.append("%s@%s" % (user, DOT.join(domain))) |
| domain = domain[1:] |
| return res |
| |
| |
| |
| def List2Dict(L, foldcase=False): |
| """Return a dict keyed by the entries in the list passed to it.""" |
| d = {} |
| if foldcase: |
| for i in L: |
| d[i.lower()] = True |
| else: |
| for i in L: |
| d[i] = True |
| return d |
| |
| |
| |
| _vowels = ('a', 'e', 'i', 'o', 'u') |
| _consonants = ('b', 'c', 'd', 'f', 'g', 'h', 'k', 'm', 'n', |
| 'p', 'r', 's', 't', 'v', 'w', 'x', 'z') |
| _syllables = [] |
| |
| for v in _vowels: |
| for c in _consonants: |
| _syllables.append(c+v) |
| _syllables.append(v+c) |
| del c, v |
| |
| def UserFriendly_MakeRandomPassword(length): |
| syls = [] |
| while len(syls) * 2 < length: |
| syls.append(random.choice(_syllables)) |
| return EMPTYSTRING.join(syls)[:length] |
| |
| |
| def Secure_MakeRandomPassword(length): |
| bytesread = 0 |
| bytes = [] |
| fd = None |
| try: |
| while bytesread < length: |
| try: |
| |
| newbytes = os.urandom(length - bytesread) |
| except (AttributeError, NotImplementedError): |
| if fd is None: |
| try: |
| fd = os.open('/dev/urandom', os.O_RDONLY) |
| except OSError, e: |
| if e.errno <> errno.ENOENT: |
| raise |
| |
| |
| |
| syslog('error', |
| 'urandom not available, passwords not secure') |
| return UserFriendly_MakeRandomPassword(length) |
| newbytes = os.read(fd, length - bytesread) |
| bytes.append(newbytes) |
| bytesread += len(newbytes) |
| s = base64.encodestring(EMPTYSTRING.join(bytes)) |
| |
| return s.replace('\n', '')[:length] |
| finally: |
| if fd is not None: |
| os.close(fd) |
| |
| |
| def MakeRandomPassword(length=mm_cfg.MEMBER_PASSWORD_LENGTH): |
| if mm_cfg.USER_FRIENDLY_PASSWORDS: |
| return UserFriendly_MakeRandomPassword(length) |
| return Secure_MakeRandomPassword(length) |
| |
| |
| def GetRandomSeed(): |
| chr1 = int(random.random() * 52) |
| chr2 = int(random.random() * 52) |
| def mkletter(c): |
| if 0 <= c < 26: |
| c += 65 |
| if 26 <= c < 52: |
| |
| c += 71 |
| return c |
| return "%c%c" % tuple(map(mkletter, (chr1, chr2))) |
| |
| |
| |
| def set_global_password(pw, siteadmin=True): |
| if siteadmin: |
| filename = mm_cfg.SITE_PW_FILE |
| else: |
| filename = mm_cfg.LISTCREATOR_PW_FILE |
| |
| omask = os.umask(026) |
| try: |
| fp = open(filename, 'w') |
| fp.write(sha.new(pw).hexdigest() + '\n') |
| fp.close() |
| finally: |
| os.umask(omask) |
| |
| |
| def get_global_password(siteadmin=True): |
| if siteadmin: |
| filename = mm_cfg.SITE_PW_FILE |
| else: |
| filename = mm_cfg.LISTCREATOR_PW_FILE |
| try: |
| fp = open(filename) |
| challenge = fp.read()[:-1] |
| fp.close() |
| except IOError, e: |
| if e.errno <> errno.ENOENT: raise |
| |
| return None |
| return challenge |
| |
| |
| def check_global_password(response, siteadmin=True): |
| challenge = get_global_password(siteadmin) |
| if challenge is None: |
| return None |
| return challenge == sha.new(response).hexdigest() |
| |
| |
| |
| def websafe(s): |
| return cgi.escape(s, quote=True) |
| |
| |
| def nntpsplit(s): |
| parts = s.split(':', 1) |
| if len(parts) == 2: |
| try: |
| return parts[0], int(parts[1]) |
| except ValueError: |
| pass |
| |
| return s, 119 |
| |
| |
| |
| |
| |
| def ObscureEmail(addr, for_text=False): |
| """Make email address unrecognizable to web spiders, but invertable. |
| |
| When for_text option is set (not default), make a sentence fragment |
| instead of a token.""" |
| if for_text: |
| return addr.replace('@', ' at ') |
| else: |
| return addr.replace('@', '--at--') |
| |
| def UnobscureEmail(addr): |
| """Invert ObscureEmail() conversion.""" |
| |
| |
| return addr.replace('--at--', '@') |
| |
| |
| |
| class OuterExit(Exception): |
| pass |
| |
| def findtext(templatefile, dict=None, raw=False, lang=None, mlist=None): |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| languages = [] |
| if lang is not None: |
| languages.append(lang) |
| if mlist is not None: |
| languages.append(mlist.preferred_language) |
| languages.append(mm_cfg.DEFAULT_SERVER_LANGUAGE) |
| |
| searchdirs = [] |
| if mlist is not None: |
| searchdirs.append(mlist.fullpath()) |
| searchdirs.append(os.path.join(mm_cfg.TEMPLATE_DIR, mlist.host_name)) |
| searchdirs.append(os.path.join(mm_cfg.TEMPLATE_DIR, 'site')) |
| searchdirs.append(mm_cfg.TEMPLATE_DIR) |
| |
| fp = None |
| try: |
| for lang in languages: |
| for dir in searchdirs: |
| filename = os.path.join(dir, lang, templatefile) |
| try: |
| fp = open(filename) |
| raise OuterExit |
| except IOError, e: |
| if e.errno <> errno.ENOENT: raise |
| |
| fp = None |
| except OuterExit: |
| pass |
| if fp is None: |
| |
| |
| try: |
| filename = os.path.join(mm_cfg.TEMPLATE_DIR, 'en', templatefile) |
| fp = open(filename) |
| except IOError, e: |
| if e.errno <> errno.ENOENT: raise |
| |
| raise IOError(errno.ENOENT, 'No template file found', templatefile) |
| template = fp.read() |
| fp.close() |
| text = template |
| if dict is not None: |
| try: |
| sdict = SafeDict(dict) |
| try: |
| text = sdict.interpolate(template) |
| except UnicodeError: |
| |
| utemplate = unicode(template, GetCharSet(lang), 'replace') |
| text = sdict.interpolate(utemplate) |
| except (TypeError, ValueError), e: |
| |
| syslog('error', 'broken template: %s\n%s', filename, e) |
| pass |
| if raw: |
| return text, filename |
| return wrap(text), filename |
| |
| |
| def maketext(templatefile, dict=None, raw=False, lang=None, mlist=None): |
| return findtext(templatefile, dict, raw, lang, mlist)[0] |
| |
| |
| |
| ADMINDATA = { |
| |
| 'confirm': (1, 1), |
| 'help': (0, 0), |
| 'info': (0, 0), |
| 'lists': (0, 0), |
| 'options': (0, 0), |
| 'password': (2, 2), |
| 'remove': (0, 0), |
| 'set': (3, 3), |
| 'subscribe': (0, 3), |
| 'unsubscribe': (0, 1), |
| 'who': (0, 0), |
| } |
| |
| |
| |
| |
| def is_administrivia(msg): |
| linecnt = 0 |
| lines = [] |
| for line in email.Iterators.body_line_iterator(msg): |
| |
| if line == '-- ': |
| break |
| if line.strip(): |
| linecnt += 1 |
| if linecnt > mm_cfg.DEFAULT_MAIL_COMMANDS_MAX_LINES: |
| return False |
| lines.append(line) |
| bodytext = NL.join(lines) |
| |
| if ADMINDATA.has_key(bodytext.strip().lower()): |
| return True |
| |
| |
| |
| bodylines = lines[:5] |
| subject = str(msg.get('subject', '')) |
| bodylines.append(subject) |
| for line in bodylines: |
| if not line.strip(): |
| continue |
| words = [word.lower() for word in line.split()] |
| minargs, maxargs = ADMINDATA.get(words[0], (None, None)) |
| if minargs is None and maxargs is None: |
| continue |
| if minargs <= len(words[1:]) <= maxargs: |
| |
| |
| if words[0] == 'set' and words[2] not in ('on', 'off'): |
| continue |
| return True |
| return False |
| |
| |
| |
| def GetRequestURI(fallback=None, escape=True): |
| """Return the full virtual path this CGI script was invoked with. |
| |
| Newer web servers seems to supply this info in the REQUEST_URI |
| environment variable -- which isn't part of the CGI/1.1 spec. |
| Thus, if REQUEST_URI isn't available, we concatenate SCRIPT_NAME |
| and PATH_INFO, both of which are part of CGI/1.1. |
| |
| Optional argument `fallback' (default `None') is returned if both of |
| the above methods fail. |
| |
| The url will be cgi escaped to prevent cross-site scripting attacks, |
| unless `escape' is set to 0. |
| """ |
| url = fallback |
| if os.environ.has_key('REQUEST_URI'): |
| url = os.environ['REQUEST_URI'] |
| elif os.environ.has_key('SCRIPT_NAME') and os.environ.has_key('PATH_INFO'): |
| url = os.environ['SCRIPT_NAME'] + os.environ['PATH_INFO'] |
| if escape: |
| return websafe(url) |
| return url |
| |
| |
| |
| def reap(kids, func=None, once=False): |
| while kids: |
| if func: |
| func() |
| try: |
| pid, status = os.waitpid(-1, os.WNOHANG) |
| except OSError, e: |
| |
| if e.errno <> errno.ECHILD: |
| raise |
| kids.clear() |
| break |
| if pid <> 0: |
| try: |
| del kids[pid] |
| except KeyError: |
| |
| pass |
| if once: |
| break |
| |
| |
| def GetLanguageDescr(lang): |
| return mm_cfg.LC_DESCRIPTIONS[lang][0] |
| |
| |
| def GetCharSet(lang): |
| return mm_cfg.LC_DESCRIPTIONS[lang][1] |
| |
| def IsLanguage(lang): |
| return mm_cfg.LC_DESCRIPTIONS.has_key(lang) |
| |
| |
| |
| def get_domain(): |
| host = os.environ.get('HTTP_HOST', os.environ.get('SERVER_NAME')) |
| port = os.environ.get('SERVER_PORT') |
| |
| if port and host.endswith(':' + port): |
| host = host[:-len(port)-1] |
| if mm_cfg.VIRTUAL_HOST_OVERVIEW and host: |
| return host.lower() |
| else: |
| |
| |
| hostname = ((mm_cfg.DEFAULT_URL |
| and urlparse.urlparse(mm_cfg.DEFAULT_URL)[1]) |
| or mm_cfg.DEFAULT_URL_HOST) |
| return hostname.lower() |
| |
| |
| def get_site_email(hostname=None, extra=None): |
| if hostname is None: |
| hostname = mm_cfg.VIRTUAL_HOSTS.get(get_domain(), get_domain()) |
| if extra is None: |
| return '%s@%s' % (mm_cfg.MAILMAN_SITE_LIST, hostname) |
| return '%s-%s@%s' % (mm_cfg.MAILMAN_SITE_LIST, extra, hostname) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| _serial = 0 |
| def unique_message_id(mlist): |
| global _serial |
| msgid = '<mailman.%d.%d.%d.%s@%s>' % ( |
| _serial, time.time(), os.getpid(), |
| mlist.internal_name(), mlist.host_name) |
| _serial += 1 |
| return msgid |
| |
| |
| |
| |
| def midnight(date=None): |
| if date is None: |
| date = time.localtime()[:3] |
| |
| return time.mktime(date + (0,)*5 + (-1,)) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def to_dollar(s): |
| """Convert from %-strings to $-strings.""" |
| s = s.replace('$', '$$').replace('%%', '%') |
| parts = cre.split(s) |
| for i in range(1, len(parts), 2): |
| if parts[i+1] and parts[i+1][0] in IDENTCHARS: |
| parts[i] = '${' + parts[i] + '}' |
| else: |
| parts[i] = '$' + parts[i] |
| return EMPTYSTRING.join(parts) |
| |
| |
| def to_percent(s): |
| """Convert from $-strings to %-strings.""" |
| s = s.replace('%', '%%').replace('$$', '$') |
| parts = dre.split(s) |
| for i in range(1, len(parts), 4): |
| if parts[i] is not None: |
| parts[i] = '$' |
| elif parts[i+1] is not None: |
| parts[i+1] = '%(' + parts[i+1] + ')s' |
| else: |
| parts[i+2] = '%(' + parts[i+2] + ')s' |
| return EMPTYSTRING.join(filter(None, parts)) |
| |
| |
| def dollar_identifiers(s): |
| """Return the set (dictionary) of identifiers found in a $-string.""" |
| d = {} |
| for name in filter(None, [b or c or None for a, b, c in dre.findall(s)]): |
| d[name] = True |
| return d |
| |
| |
| def percent_identifiers(s): |
| """Return the set (dictionary) of identifiers found in a %-string.""" |
| d = {} |
| for name in cre.findall(s): |
| d[name] = True |
| return d |
| |
| |
| |
| |
| |
| def canonstr(s, lang=None): |
| newparts = [] |
| parts = re.split(r'&(?P<ref>[^;]+);', s) |
| def appchr(i): |
| if i < 256: |
| newparts.append(chr(i)) |
| else: |
| newparts.append(unichr(i)) |
| while True: |
| newparts.append(parts.pop(0)) |
| if not parts: |
| break |
| ref = parts.pop(0) |
| if ref.startswith('#'): |
| try: |
| appchr(int(ref[1:])) |
| except ValueError: |
| |
| newparts.append('&'+ref+';') |
| else: |
| c = htmlentitydefs.entitydefs.get(ref, '?') |
| if c.startswith('#') and c.endswith(';'): |
| appchr(int(ref[1:-1])) |
| else: |
| newparts.append(c) |
| newstr = EMPTYSTRING.join(newparts) |
| if isinstance(newstr, UnicodeType): |
| return newstr |
| |
| |
| |
| |
| |
| if lang is None: |
| charset = 'iso-8859-1' |
| else: |
| charset = GetCharSet(lang) |
| if charset == 'us-ascii': |
| charset = 'iso-8859-1' |
| return unicode(newstr, charset, 'replace') |
| |
| |
| |
| |
| |
| |
| def uncanonstr(s, lang=None): |
| if s is None: |
| s = u'' |
| if lang is None: |
| charset = 'us-ascii' |
| else: |
| charset = GetCharSet(lang) |
| |
| |
| |
| try: |
| if isinstance(s, UnicodeType): |
| return s.encode(charset) |
| else: |
| u = unicode(s, charset) |
| return s |
| except UnicodeError: |
| |
| return uquote(s) |
| |
| |
| def uquote(s): |
| a = [] |
| for c in s: |
| o = ord(c) |
| if o > 127: |
| a.append('&#%3d;' % o) |
| else: |
| a.append(c) |
| |
| return str(EMPTYSTRING.join(a)) |
| |
| |
| def oneline(s, cset): |
| |
| try: |
| h = email.Header.make_header(email.Header.decode_header(s)) |
| ustr = h.__unicode__() |
| line = UEMPTYSTRING.join(ustr.splitlines()) |
| return line.encode(cset, 'replace') |
| except (LookupError, UnicodeError, ValueError, HeaderParseError): |
| |
| return EMPTYSTRING.join(s.splitlines()) |