46 lines
1.5 KiB
Python

from datetime import datetime, timedelta
import jwt
import requests
from typing import Dict, Optional
class N8NWebhookJwt:
def __init__(self, secret_key: str, webhook_url: str):
self.secret_key = secret_key
self.webhook_url = webhook_url
self.token_expiration = datetime.now() + timedelta(hours=1)
def _generate_jwt_token(self, payload: Dict) -> str:
"""Generate JWT token with the given payload."""
# Include expiration time (optional)
payload["exp"] = self.token_expiration.timestamp()
encoded_jwt = jwt.encode(
payload,
self.secret_key,
algorithm="HS256",
)
return encoded_jwt #jwt.decode(encoded_jwt, self.secret_key, algorithms=['HS256'])
def send_webhook(self, payload: Dict) -> Dict:
"""Send a webhook request with JWT authentication."""
# Generate JWT token
token = self._generate_jwt_token(payload)
# Set headers with JWT token
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Send POST request
response = requests.post(
self.webhook_url,
json=payload,
headers=headers
)
# Handle response
if response.status_code == 200:
return {"status": "success", "response": response.json()}
else:
return {"status": "error", "response": response.status_code, "message": response.text}