在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称(OpenSource Name):graphql-python/graphql-ws开源软件地址(OpenSource Url):https://github.com/graphql-python/graphql-ws开源编程语言(OpenSource Language):Python 87.0%开源软件介绍(OpenSource Introduction):GraphQL WSWebsocket backend for GraphQL subscriptions. Supports the following application servers: Python 3 application servers, using asyncio:
Python 2 application servers:
Installation instructionsFor instaling graphql-ws, just run this command in your shell pip install graphql-ws ExamplesPython 3 serversCreate a subscribable schema like this: import asyncio
import graphene
class Query(graphene.ObjectType):
hello = graphene.String()
@staticmethod
def resolve_hello(obj, info, **kwargs):
return "world"
class Subscription(graphene.ObjectType):
count_seconds = graphene.Float(up_to=graphene.Int())
async def resolve_count_seconds(root, info, up_to):
for i in range(up_to):
yield i
await asyncio.sleep(1.)
yield up_to
schema = graphene.Schema(query=Query, subscription=Subscription) aiohttpThen just plug into your aiohttp server. from graphql_ws.aiohttp import AiohttpSubscriptionServer
from .schema import schema
subscription_server = AiohttpSubscriptionServer(schema)
async def subscriptions(request):
ws = web.WebSocketResponse(protocols=('graphql-ws',))
await ws.prepare(request)
await subscription_server.handle(ws)
return ws
app = web.Application()
app.router.add_get('/subscriptions', subscriptions)
web.run_app(app, port=8000) You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/aiohttp websockets compatible serversWorks with any framework that uses the websockets library for its websocket implementation. For this example, plug in your Sanic server. from graphql_ws.websockets_lib import WsLibSubscriptionServer
from . import schema
app = Sanic(__name__)
subscription_server = WsLibSubscriptionServer(schema)
@app.websocket('/subscriptions', subprotocols=['graphql-ws'])
async def subscriptions(request, ws):
await subscription_server.handle(ws)
return ws
app.run(host="0.0.0.0", port=8000) Django v2+Django Channels 2Set up with Django Channels just takes three steps:
First INSTALLED_APPS = [
"channels",
"graphql_ws.django",
"graphene_django",
# ...
] Point to your schema in Django settings: GRAPHENE = {
'SCHEMA': 'yourproject.schema.schema'
} Finally, you can set up channels routing yourself (maybe using
ASGI_APPLICATION = 'graphql_ws.django.routing.application'
# or
ASGI_APPLICATION = 'graphql_ws.django.routing.auth_application' Run Python 2 serversCreate a subscribable schema like this: import graphene
from rx import Observable
class Query(graphene.ObjectType):
hello = graphene.String()
@staticmethod
def resolve_hello(obj, info, **kwargs):
return "world"
class Subscription(graphene.ObjectType):
count_seconds = graphene.Float(up_to=graphene.Int())
async def resolve_count_seconds(root, info, up_to=5):
return Observable.interval(1000)\
.map(lambda i: "{0}".format(i))\
.take_while(lambda i: int(i) <= up_to)
schema = graphene.Schema(query=Query, subscription=Subscription) Gevent compatible serversThen just plug into your Gevent server, for example, Flask: from flask_sockets import Sockets
from graphql_ws.gevent import GeventSubscriptionServer
from schema import schema
subscription_server = GeventSubscriptionServer(schema)
app.app_protocol = lambda environ_path_info: 'graphql-ws'
@sockets.route('/subscriptions')
def echo_socket(ws):
subscription_server.handle(ws)
return [] You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent Django v1.xFor Django v1.x and Django Channels v1.x, setup your schema in GRAPHENE = {
'SCHEMA': 'yourproject.schema.schema'
} Then CHANNELS_WS_PROTOCOLS = ["graphql-ws", ]
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgiref.inmemory.ChannelLayer",
"ROUTING": "django_subscriptions.urls.channel_routing",
},
} And finally add the channel routes from channels.routing import route_class
from graphql_ws.django_channels import GraphQLSubscriptionConsumer
channel_routing = [
route_class(GraphQLSubscriptionConsumer, path=r"^/subscriptions"),
] You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/django_subscriptions |
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论