flask_login.utils 源代码

# -*- coding: utf-8 -*-
'''
    flask_login.utils
    -----------------
    General utilities.
'''


import hmac
from hashlib import sha512
from functools import wraps
from werkzeug.local import LocalProxy
from werkzeug.security import safe_str_cmp
from werkzeug.urls import url_decode, url_encode

from flask import (_request_ctx_stack, current_app, request, session, url_for,
                   has_request_context)

from ._compat import text_type, urlparse, urlunparse
from .config import COOKIE_NAME, EXEMPT_METHODS
from .signals import user_logged_in, user_logged_out, user_login_confirmed


#: 当前用户的代理对象。如果当前用户未登录,这将是一个匿名用户
current_user = LocalProxy(lambda: _get_user())


def encode_cookie(payload, key=None):
    '''
    This will encode a ``unicode`` value into a cookie, and sign that cookie
    with the app's secret key.

    :param payload: The value to encode, as `unicode`.
    :type payload: unicode

    :param key: The key to use when creating the cookie digest. If not
                specified, the SECRET_KEY value from app config will be used.
    :type key: str
    '''
    return u'{0}|{1}'.format(payload, _cookie_digest(payload, key=key))


def decode_cookie(cookie, key=None):
    '''
    This decodes a cookie given by `encode_cookie`. If verification of the
    cookie fails, ``None`` will be implicitly returned.

    :param cookie: An encoded cookie.
    :type cookie: str

    :param key: The key to use when creating the cookie digest. If not
                specified, the SECRET_KEY value from app config will be used.
    :type key: str
    '''
    try:
        payload, digest = cookie.rsplit(u'|', 1)
        if hasattr(digest, 'decode'):
            digest = digest.decode('ascii')  # pragma: no cover
    except ValueError:
        return

    if safe_str_cmp(_cookie_digest(payload, key=key), digest):
        return payload


def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url


def expand_login_view(login_view):
    '''
    Returns the url for the login view, expanding the view name to a url if
    needed.

    :param login_view: The name of the login view or a URL for the login view.
    :type login_view: str
    '''
    if login_view.startswith(('https://', 'http://', '/')):
        return login_view
    else:
        return url_for(login_view)


[文档]def login_url(login_view, next_url=None, next_field='next'): ''' 创建用于重定向到登录页面的 URL。如果只提供了 `login_view` 参数, 函数将仅返回该视图的 URL。如果提供了 `next_url` 参数,则会添加一个 ``next=URL`` 的参数到查询字符串,以便登录视图可以重定向到提供的这个 URL。 Flask-Login 默认的未认证处理器(unauthorized handler) 在重定向到登录 url 时使用这个函数。 将 `FORCE_HOST_FOR_REDIRECTS` 配置设置为一个主机地址, 即可强制在 URL 中使用主机名。 这可以防止在请求中有 Host 或 X-Forwarded-For 首部字段时重定向到外部站点。 :param login_view: 登录视图的名称(也可以是登录视图的实际 URL) :type login_view: str :param next_url: 提供给登录视图来重定向的URL。 :type next_url: str :param next_field: 存储下一个 URL 的字段名称。(默认为 ``next``) :type next_field: str ''' base = expand_login_view(login_view) if next_url is None: return base parsed_result = urlparse(base) md = url_decode(parsed_result.query) md[next_field] = make_next_param(base, next_url) netloc = current_app.config.get('FORCE_HOST_FOR_REDIRECTS') or \ parsed_result.netloc parsed_result = parsed_result._replace(netloc=netloc, query=url_encode(md, sort=True)) return urlunparse(parsed_result)
[文档]def login_fresh(): ''' 如果当前登录是“新鲜”的,返回 ``True``。 ''' return session.get('_fresh', False)
[文档]def login_user(user, remember=False, duration=None, force=False, fresh=True): ''' 登录用户。你应该在这个方法中传入实际的用户对象。 如果用户的 `is_active` 属性为 ``False``,他们将不会被登录,除非 `force` 参数为 ``True``。 如果登录成功将返回 ``True`` ,如果登录失败则返回 ``False`` (即用户的账号为不活跃状态)。 :param user: 要登录的用户对象。 :type user: object :param remember: session 过期后是否记住用户。默认值为 ``False``。 :type remember: bool :param duration: 记住我 cookie 的过期时长。如果值为 ``None`` 将使用配置中设置的值. 默认值为 ``None``。 :type duration: :class:`datetime.timedelta` :param force: 如果用户处于不活跃状态,设置这个参数为 ``True`` 将强制登录用户。默认值为 ``False``。 :type force: bool :param fresh: 将该参数设置为 ``False``,将会在登录用户时标记 session 为”不新鲜“。默认值为 ``True``。 :type fresh: bool ''' if not force and not user.is_active: return False user_id = getattr(user, current_app.login_manager.id_attribute)() session['user_id'] = user_id session['_fresh'] = fresh session['_id'] = current_app.login_manager._session_identifier_generator() if remember: session['remember'] = 'set' if duration is not None: try: # equal to timedelta.total_seconds() but works with Python 2.6 session['remember_seconds'] = (duration.microseconds + (duration.seconds + duration.days * 24 * 3600) * 10**6) / 10.0**6 except AttributeError: raise Exception('duration must be a datetime.timedelta, ' 'instead got: {0}'.format(duration)) current_app.login_manager._update_request_context_with_user(user) user_logged_in.send(current_app._get_current_object(), user=_get_user()) return True
[文档]def logout_user(): ''' 注销用户。(不需要传入实际的用户对象。)若存在记住我 cookie,该 cookie 将会被清除。 ''' user = _get_user() if 'user_id' in session: session.pop('user_id') if '_fresh' in session: session.pop('_fresh') if '_id' in session: session.pop('_id') cookie_name = current_app.config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) if cookie_name in request.cookies: session['remember'] = 'clear' if 'remember_seconds' in session: session.pop('remember_seconds') user_logged_out.send(current_app._get_current_object(), user=user) current_app.login_manager._update_request_context_with_user() return True
[文档]def confirm_login(): ''' 将当前 session 设置为”新鲜“状态。当从 cookie 重新恢复时, session 会变得不新鲜。 ''' session['_fresh'] = True session['_id'] = current_app.login_manager._session_identifier_generator() user_login_confirmed.send(current_app._get_current_object())
[文档]def login_required(func): ''' 如果你用这个函数装饰一个视图,它将确保当前用户在调用实际的视图之前已经通过认证并登录。 (如果他们没有,它将会调用 :attr:`LoginManager.unauthorized` 回调函数)。例如:: @app.route('/post') @login_required def post(): pass 如果只在特定场景你需要要求用户已经登录,你可以这样做:: if not current_user.is_authenticated: return current_app.login_manager.unauthorized() ...这基本上也是这个函数装饰视图时会额外执行的代码。 我们可以很方便在进行单元测试时全局地关闭认证功能。 如果应用程序的配置变量 `LOGIN_DISABLED` 被设置为 `True`,这个装饰器将会被忽略。 .. Note :: 根据 `W3 guidelines for CORS preflight requests <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_, ``OPTIONS`` 类型的 HTTP 请求不会进行登录检查。 :param func: 要装饰的视图函数。 :type func: function ''' @wraps(func) def decorated_view(*args, **kwargs): if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif current_app.config.get('LOGIN_DISABLED'): return func(*args, **kwargs) elif not current_user.is_authenticated: return current_app.login_manager.unauthorized() return func(*args, **kwargs) return decorated_view
[文档]def fresh_login_required(func): ''' 如果你用这个函数装饰视图,它将确保当前用户的登录是”新鲜“的 - 即他们 的 session 不是从“记住我” cookie 中恢复的。 像改变密码或者邮箱这样的敏感操作应该用这个来保护,以提防 cookie 窃贼的攻击。 如果用户没有通过认证,:meth:`LoginManager.unauthorized` 像平常一样会被调用。 如果他们已经通过认证,但 session 是”不新鲜“的, 它将调用 :meth:`LoginManager.needs_refresh`。(在这种情况下,你将需要提供 一个 :attr:`LoginManager.refresh_view`。) 关于配置变量,该装饰器和 :func:`login_required` 装饰器有同样的行为。 .. Note :: 根据 `W3 guidelines for CORS preflight requests <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_, ``OPTIONS`` 类型的 HTTP 请求不会进行登录检查。 :param func: 要装饰的视图函数。 :type func: function ''' @wraps(func) def decorated_view(*args, **kwargs): if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif current_app.config.get('LOGIN_DISABLED'): return func(*args, **kwargs) elif not current_user.is_authenticated: return current_app.login_manager.unauthorized() elif not login_fresh(): return current_app.login_manager.needs_refresh() return func(*args, **kwargs) return decorated_view
def set_login_view(login_view, blueprint=None): ''' Sets the login view for the app or blueprint. If a blueprint is passed, the login view is set for this blueprint on ``blueprint_login_views``. :param login_view: The user object to log in. :type login_view: str :param blueprint: The blueprint which this login view should be set on. Defaults to ``None``. :type blueprint: object ''' num_login_views = len(current_app.login_manager.blueprint_login_views) if blueprint is not None or num_login_views != 0: (current_app.login_manager .blueprint_login_views[blueprint.name]) = login_view if (current_app.login_manager.login_view is not None and None not in current_app.login_manager.blueprint_login_views): (current_app.login_manager .blueprint_login_views[None]) = (current_app.login_manager .login_view) current_app.login_manager.login_view = None else: current_app.login_manager.login_view = login_view def _get_user(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'): current_app.login_manager._load_user() return getattr(_request_ctx_stack.top, 'user', None) def _cookie_digest(payload, key=None): key = _secret_key(key) return hmac.new(key, payload.encode('utf-8'), sha512).hexdigest() def _get_remote_addr(): address = request.headers.get('X-Forwarded-For', request.remote_addr) if address is not None: # An 'X-Forwarded-For' header includes a comma separated list of the # addresses, the first address being the actual remote address. address = address.encode('utf-8').split(b',')[0].strip() return address def _create_identifier(): user_agent = request.headers.get('User-Agent') if user_agent is not None: user_agent = user_agent.encode('utf-8') base = '{0}|{1}'.format(_get_remote_addr(), user_agent) if str is bytes: base = text_type(base, 'utf-8', errors='replace') # pragma: no cover h = sha512() h.update(base.encode('utf8')) return h.hexdigest() def _user_context_processor(): return dict(current_user=_get_user()) def _secret_key(key=None): if key is None: key = current_app.config['SECRET_KEY'] if isinstance(key, text_type): # pragma: no cover key = key.encode('latin1') # ensure bytes return key