docs/reference/adapter/starlette/handler.html
def to_bolt_request(req: starlette.requests.Request,body: bytes,addition_context_properties: Dict[str, Any] | None = None) ‑> BoltRequest#Expand source code
def to_bolt_request(
req: Request,
body: bytes,
addition_context_properties: Optional[Dict[str, Any]] = None,
) -> BoltRequest:
request = BoltRequest(
body=body.decode("utf-8"),
query=req.query_params, # type: ignore[arg-type]
headers=req.headers, # type: ignore[arg-type]
)
if addition_context_properties is not None:
for k, v in addition_context_properties.items():
request.context[k] = v
return request
def to_starlette_response(bolt_resp: BoltResponse) ‑> starlette.responses.Response#Expand source code
def to_starlette_response(bolt_resp: BoltResponse) -> Response:
resp = Response(
status_code=bolt_resp.status,
content=bolt_resp.body,
headers=bolt_resp.first_headers_without_set_cookie(),
)
for cookie in bolt_resp.cookies():
for name, c in cookie.items():
resp.set_cookie(
key=name,
value=c.value,
max_age=c.get("max-age"),
expires=c.get("expires"),
path=c.get("path"),
domain=c.get("domain"),
secure=True,
httponly=True,
)
return resp
class SlackRequestHandler (app: App)#Expand source code
class SlackRequestHandler:
def __init__ (self, app: App):
self.app = app
async def handle(self, req: Request, addition_context_properties: Optional[Dict[str, Any]] = None) -> Response:
body = await req.body()
if req.method == "GET":
if self.app.oauth_flow is not None:
oauth_flow: OAuthFlow = self.app.oauth_flow
if req.url.path == oauth_flow.install_path:
bolt_resp = oauth_flow.handle_installation(to_bolt_request(req, body, addition_context_properties))
return to_starlette_response(bolt_resp)
elif req.url.path == oauth_flow.redirect_uri_path:
bolt_resp = oauth_flow.handle_callback(to_bolt_request(req, body, addition_context_properties))
return to_starlette_response(bolt_resp)
elif req.method == "POST":
bolt_resp = self.app.dispatch(to_bolt_request(req, body, addition_context_properties))
return to_starlette_response(bolt_resp)
return Response(
status_code=404,
content="Not found",
)
async def handle(self,req: starlette.requests.Request,addition_context_properties: Dict[str, Any] | None = None) ‑> starlette.responses.Response#Expand source code
async def handle(self, req: Request, addition_context_properties: Optional[Dict[str, Any]] = None) -> Response:
body = await req.body()
if req.method == "GET":
if self.app.oauth_flow is not None:
oauth_flow: OAuthFlow = self.app.oauth_flow
if req.url.path == oauth_flow.install_path:
bolt_resp = oauth_flow.handle_installation(to_bolt_request(req, body, addition_context_properties))
return to_starlette_response(bolt_resp)
elif req.url.path == oauth_flow.redirect_uri_path:
bolt_resp = oauth_flow.handle_callback(to_bolt_request(req, body, addition_context_properties))
return to_starlette_response(bolt_resp)
elif req.method == "POST":
bolt_resp = self.app.dispatch(to_bolt_request(req, body, addition_context_properties))
return to_starlette_response(bolt_resp)
return Response(
status_code=404,
content="Not found",
)