from sanic import Sanic from sanic import response as res app = Sanic(__name__) @app.route("/") async def test(req): return res.text("I\'m a teapot", status=418) if __name__ == '__main__': app.run(host="", port=8000) from sanic import Sanic from sanic import response app = Sanic(__name__) @app.route("/") async def test(request): return response.json({"test": True}) if __name__ == '__main__': app.run(host="", port=8000)
简单的应用程序与 Sanic Views
from sanic import Sanic from sanic.views import HTTPMethodView from sanic.response import text app = Sanic('some_name') class SimpleView(HTTPMethodView): def get(self, request): return text('I am get method') def post(self, request): return text('I am post method') def put(self, request): return text('I am put method') def patch(self, request): return text('I am patch method') def delete(self, request): return text('I am delete method') class SimpleAsyncView(HTTPMethodView): async def get(self, request): return text('I am async get method') async def post(self, request): return text('I am async post method') async def put(self, request): return text('I am async put method') app.add_route(SimpleView.as_view(), '/') app.add_route(SimpleAsyncView.as_view(), '/async') if __name__ == '__main__': app.run(host="", port=8000, debug=True)
from sanic import Sanic from sanic import response app = Sanic(__name__) @app.route('/') def handle_request(request): return response.redirect('/redirect') @app.route('/redirect') async def test(request): return response.json({"Redirected": True}) if __name__ == '__main__': app.run(host="", port=8000)
from sanic import Sanic from sanic import response app = Sanic(__name__) @app.route('/') async def index(request): # generate a URL for the endpoint `post_handler` url = app.url_for('post_handler', post_id=5) # the URL is `/posts/5`, redirect to it return response.redirect(url) @app.route('/posts/<post_id>') async def post_handler(request, post_id): return response.text('Post - {}'.format(post_id)) if __name__ == '__main__': app.run(host="", port=8000, debug=True)
Sanic 提供了一项了不起的功能,可将您的API和路由分组到一个逻辑集合下,该逻辑集合可轻松导入并插入到您的任何sanic应用程序中,这称为 blueprints
from sanic import Blueprint, Sanic from sanic.response import file, json app = Sanic(__name__) blueprint = Blueprint('name', url_prefix='/my_blueprint') blueprint2 = Blueprint('name2', url_prefix='/my_blueprint2') blueprint3 = Blueprint('name3', url_prefix='/my_blueprint3') @blueprint.route('/foo') async def foo(request): return json({'msg': 'hi from blueprint'}) @blueprint2.route('/foo') async def foo2(request): return json({'msg': 'hi from blueprint2'}) @blueprint3.route('/foo') async def index(request): return await file('websocket.html') @app.websocket('/feed') async def foo3(request, ws): while True: data = 'hello!' print('Sending: ' + data) await ws.send(data) data = await ws.recv() print('Received: ' + data) app.blueprint(blueprint) app.blueprint(blueprint2) app.blueprint(blueprint3) app.run(host="", port=8000, debug=True)
from sanic import Sanic from sanic import response import logging logging_format = "[%(asctime)s] %(process)d-%(levelname)s " logging_format += "%(module)s::%(funcName)s():l%(lineno)d: " logging_format += "%(message)s" logging.basicConfig( format=logging_format, level=logging.DEBUG ) log = logging.getLogger() # Set logger to override default basicConfig sanic = Sanic() @sanic.route("/") def test(request): log.info("received request; responding with 'hey'") return response.text("hey") sanic.run(host="", port=8000)
以下示例提供了一个示例代码,演示了的用法,sanic.app.Sanic.middleware()以提供一种机制,为每个传入请求分配唯一的请求ID并通过aiotask-context记录它们 。
''' Based on example from https://github.com/Skyscanner/aiotask-context and `examples/{override_logging,run_async}.py`. Needs https://github.com/Skyscanner/aiotask-context/tree/52efbc21e2e1def2d52abb9a8e951f3ce5e6f690 or newer $ pip install git+https://github.com/Skyscanner/aiotask-context.git ''' import asyncio import uuid import logging from signal import signal, SIGINT from sanic import Sanic from sanic import response import uvloop import aiotask_context as context log = logging.getLogger(__name__) class RequestIdFilter(logging.Filter): def filter(self, record): record.request_id = context.get('X-Request-ID') return True LOG_SETTINGS = { 'version': 1, 'disable_existing_loggers': False, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'level': 'DEBUG', 'formatter': 'default', 'filters': ['requestid'], }, }, 'filters': { 'requestid': { '()': RequestIdFilter, }, }, 'formatters': { 'default': { 'format': '%(asctime)s %(levelname)s %(name)s:%(lineno)d %(request_id)s | %(message)s', }, }, 'loggers': { '': { 'level': 'DEBUG', 'handlers': ['console'], 'propagate': True }, } } app = Sanic(__name__, log_config=LOG_SETTINGS) @app.middleware('request') async def set_request_id(request): request_id = request.headers.get('X-Request-ID') or str(uuid.uuid4()) context.set("X-Request-ID", request_id) @app.route("/") async def test(request): log.debug('X-Request-ID: %s', context.get('X-Request-ID')) log.info('Hello from test!') return response.json({"test": True}) if __name__ == '__main__': asyncio.set_event_loop(uvloop.new_event_loop()) server = app.create_server(host="", port=8000, return_asyncio_server=True) loop = asyncio.get_event_loop() loop.set_task_factory(context.task_factory) task = asyncio.ensure_future(server) try: loop.run_forever() except: loop.stop()
Sanic Stream支持
from sanic import Sanic from sanic.views import CompositionView from sanic.views import HTTPMethodView from sanic.views import stream as stream_decorator from sanic.blueprints import Blueprint from sanic.response import stream, text bp = Blueprint('blueprint_request_stream') app = Sanic('request_stream') class SimpleView(HTTPMethodView): @stream_decorator async def post(self, request): result = '' while True: body = await request.stream.get() if body is None: break result += body.decode('utf-8') return text(result) @app.post('/stream', stream=True) async def handler(request): async def streaming(response): while True: body = await request.stream.get() if body is None: break body = body.decode('utf-8').replace('1', 'A') await response.write(body) return stream(streaming) @bp.put('/bp_stream', stream=True) async def bp_handler(request): result = '' while True: body = await request.stream.get() if body is None: break result += body.decode('utf-8').replace('1', 'A') return text(result) async def post_handler(request): result = '' while True: body = await request.stream.get() if body is None: break result += body.decode('utf-8') return text(result) app.blueprint(bp) app.add_route(SimpleView.as_view(), '/method_view') view = CompositionView() view.add(['POST'], post_handler, stream=True) app.add_route(view, '/composition_view') if __name__ == '__main__': app.run(host='', port=8000)
import requests # Warning: This is a heavy process. data = "" for i in range(1, 250000): data += str(i) r = requests.post('', data=data) print(r.text)
from sanic import Sanic from sanic.response import json import asyncio import aiohttp app = Sanic(__name__) sem = None @app.listener('before_server_start') def init(sanic, loop): global sem concurrency_per_worker = 4 sem = asyncio.Semaphore(concurrency_per_worker, loop=loop) async def bounded_fetch(session, url): """ Use session object to perform 'get' request on url """ async with sem, session.get(url) as response: return await response.json() @app.route("/") async def test(request): """ Download and serve example JSON """ url = "https://api.github.com/repos/channelcat/sanic" async with aiohttp.ClientSession() as session: response = await bounded_fetch(session, url) return json(response) app.run(host="", port=8000, workers=2)
FROM python:3.5 MAINTAINER Channel Cat <channelcat@gmail.com> ADD . /code RUN pip3 install git+https://github.com/channelcat/sanic EXPOSE 8000 WORKDIR /code CMD ["python", "simple_server.py"] version: '2' services: sanic: build: . ports: - "8000:8000"
Sanic通过提供了全局异常处理程序的可扩展的最低要求最低实现 sanic.handlers.ErrorHandler。本示例说明如何扩展它以启用某些自定义行为。
""" Example intercepting uncaught exceptions using Sanic's error handler framework. This may be useful for developers wishing to use Sentry, Airbrake, etc. or a custom system to log and monitor unexpected errors in production. First we create our own class inheriting from Handler in sanic.exceptions, and pass in an instance of it when we create our Sanic instance. Inside this class' default handler, we can do anything including sending exceptions to an external service. """ from sanic.handlers import ErrorHandler from sanic.exceptions import SanicException """ Imports and code relevant for our CustomHandler class (Ordinarily this would be in a separate file) """ class CustomHandler(ErrorHandler): def default(self, request, exception): # Here, we have access to the exception object # and can do anything with it (log, send to external service, etc) # Some exceptions are trivial and built into Sanic (404s, etc) if not isinstance(exception, SanicException): print(exception) # Then, we must finish handling the exception by returning # our response to the client # For this we can just call the super class' default handler return super().default(request, exception) """ This is an ordinary Sanic server, with the exception that we set the server's error_handler to an instance of our CustomHandler """ from sanic import Sanic app = Sanic(__name__) handler = CustomHandler() app.error_handler = handler @app.route("/") async def test(request): # Here, something occurs which causes an unexpected exception # This exception will flow to our custom handler. raise SanicException('You Broke It!') if __name__ == '__main__': app.run(host="", port=8000, debug=True)
import logging import socket from os import getenv from platform import node from uuid import getnode as get_mac from logdna import LogDNAHandler from sanic import Sanic from sanic.response import json from sanic.request import Request log = logging.getLogger('logdna') log.setLevel(logging.INFO) def get_my_ip_address(remote_server="google.com"): with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect((remote_server, 80)) return s.getsockname()[0] def get_mac_address(): h = iter(hex(get_mac())[2:].zfill(12)) return ":".join(i + next(h) for i in h) logdna_options = { "app": __name__, "index_meta": True, "hostname": node(), "ip": get_my_ip_address(), "mac": get_mac_address() } logdna_handler = LogDNAHandler(getenv("LOGDNA_API_KEY"), options=logdna_options) logdna = logging.getLogger(__name__) logdna.setLevel(logging.INFO) logdna.addHandler(logdna_handler) app = Sanic(__name__) @app.middleware def log_request(request: Request): logdna.info("I was Here with a new Request to URL: {}".format(request.url)) @app.route("/") def default(request): return json({ "response": "I was here" }) if __name__ == "__main__": app.run( host="", port=getenv("PORT", 8080) )
from os import getenv from raygun4py.raygunprovider import RaygunSender from sanic import Sanic from sanic.exceptions import SanicException from sanic.handlers import ErrorHandler class RaygunExceptionReporter(ErrorHandler): def __init__(self, raygun_api_key=None): super().__init__() if raygun_api_key is None: raygun_api_key = getenv("RAYGUN_API_KEY") self.sender = RaygunSender(raygun_api_key) def default(self, request, exception): self.sender.send_exception(exception=exception) return super().default(request, exception) raygun_error_reporter = RaygunExceptionReporter() app = Sanic(__name__, error_handler=raygun_error_reporter) @app.route("/raise") async def test(request): raise SanicException('You Broke It!') if __name__ == '__main__': app.run( host="", port=getenv("PORT", 8080) )
import rollbar from sanic.handlers import ErrorHandler from sanic import Sanic from sanic.exceptions import SanicException from os import getenv rollbar.init(getenv("ROLLBAR_API_KEY")) class RollbarExceptionHandler(ErrorHandler): def default(self, request, exception): rollbar.report_message(str(exception)) return super().default(request, exception) app = Sanic(__name__, error_handler=RollbarExceptionHandler()) @app.route("/raise") def create_error(request): raise SanicException("I was here and I don't like where I am") if __name__ == "__main__": app.run( host="", port=getenv("PORT", 8080) )
from os import getenv from sentry_sdk import init as sentry_init from sentry_sdk.integrations.sanic import SanicIntegration from sanic import Sanic from sanic.response import json sentry_init( dsn=getenv("SENTRY_DSN"), integrations=[SanicIntegration()], ) app = Sanic(__name__) # noinspection PyUnusedLocal @app.route("/working") async def working_path(request): return json({ "response": "Working API Response" }) # noinspection PyUnusedLocal @app.route("/raise-error") async def raise_error(request): raise Exception("Testing Sentry Integration") if __name__ == '__main__': app.run( host="", port=getenv("PORT", 8080) )
# -*- coding: utf-8 -*- from sanic import Sanic from functools import wraps from sanic.response import json app = Sanic() def check_request_for_authorization_status(request): # Note: Define your check, for instance cookie, session. flag = True return flag def authorized(f): @wraps(f) async def decorated_function(request, *args, **kwargs): # run some method that checks the request # for the client's authorization status is_authorized = check_request_for_authorization_status(request) if is_authorized: # the user is authorized. # run the handler method and return the response response = await f(request, *args, **kwargs) return response else: # the user is not authorized. return json({'status': 'not_authorized'}, 403) return decorated_function @app.route("/") @authorized async def test(request): return json({'status': 'authorized'}) if __name__ == "__main__": app.run(host="", port=8000)
Sanic WebSocket
<!DOCTYPE html> <html> <head> <title>WebSocket demo</title> </head> <body> <script> var ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/feed'), messages = document.createElement('ul'); ws.onmessage = function (event) { var messages = document.getElementsByTagName('ul')[0], message = document.createElement('li'), content = document.createTextNode('Received: ' + event.data); message.appendChild(content); messages.appendChild(message); }; document.body.appendChild(messages); window.setInterval(function() { data = 'bye!' ws.send(data); var messages = document.getElementsByTagName('ul')[0], message = document.createElement('li'), content = document.createTextNode('Sent: ' + data); message.appendChild(content); messages.appendChild(message); }, 1000); </script> </body> </html>
from sanic import Sanic from sanic.response import file app = Sanic(__name__) @app.route('/') async def index(request): return await file('websocket.html') @app.websocket('/feed') async def feed(request, ws): while True: data = 'hello!' print('Sending: ' + data) await ws.send(data) data = await ws.recv() print('Received: ' + data) if __name__ == '__main__': app.run(host="", port=8000, debug=True) 虚拟主机Suppport from sanic import response from sanic import Sanic from sanic.blueprints import Blueprint # Usage # curl -H "Host: example.com" localhost:8000 # curl -H "Host: sub.example.com" localhost:8000 # curl -H "Host: bp.example.com" localhost:8000/question # curl -H "Host: bp.example.com" localhost:8000/answer app = Sanic() bp = Blueprint("bp", host="bp.example.com") @app.route('/', host=["example.com", "somethingelse.com", "therestofyourdomains.com"]) async def hello(request): return response.text("Some defaults") @app.route('/', host="sub.example.com") async def hello(request): return response.text("42") @bp.route("/question") async def hello(request): return response.text("What is the meaning of life?") @bp.route("/answer") async def hello(request): return response.text("42") app.blueprint(bp) if __name__ == '__main__': app.run(host="", port=8000) 带有并行测试运行支持的单元测试 以下示例显示了如何sanic使用pytest-xdist插件提供的并行测试执行支持来启动并运行单元测试应用程序。 """pytest-xdist example for sanic server Install testing tools: $ pip install pytest pytest-xdist Run with xdist params: $ pytest examples/pytest_xdist.py -n 8 # 8 workers """ import re from sanic import Sanic from sanic.response import text from sanic.testing import PORT as PORT_BASE, SanicTestClient import pytest @pytest.fixture(scope="session") def test_port(worker_id): m = re.search(r'[0-9]+', worker_id) if m: num_id = m.group(0) else: num_id = 0 port = PORT_BASE + int(num_id) return port @pytest.fixture(scope="session") def app(): app = Sanic() @app.route('/') async def index(request): return text('OK') return app @pytest.fixture(scope="session") def client(app, test_port): return SanicTestClient(app, test_port) @pytest.mark.parametrize('run_id', range(100)) def test_index(client, run_id): request, response = client._sanic_endpoint_test('get', '/') assert response.status == 200 assert response.text == 'OK' 修订请求对象 中的request对象Sanic是一种dict对象,这意味着request可以将对象作为常规dict对象进行操作。 from sanic import Sanic from sanic.response import text from random import randint app = Sanic() @app.middleware('request') def append_request(request): # Add new key with random value request['num'] = randint(0, 100) @app.get('/pop') def pop_handler(request): # Pop key from request object num = request.pop('num') return text(num) @app.get('/key_exist') def key_exist_handler(request): # Check the key is exist or not if 'num' in request: return text('num exist in request') return text('num does not exist in reqeust') app.run(host="", port=8000, debug=True)