feat: celery decoupling
This commit is contained in:
parent
a9768baf12
commit
c17153112b
10 changed files with 182 additions and 3 deletions
|
@ -2,4 +2,6 @@ WG_INTERFACE=wg0
|
|||
#False for production True for testing and debugging
|
||||
DEBUG=False
|
||||
#See here https://docs.djangoproject.com/en/5.1/ref/settings/#allowed-hosts
|
||||
ALLOWED_HOSTS=127.0.0.1
|
||||
ALLOWED_HOSTS=127.0.0.1
|
||||
CELERY_BROKER=redis+socket:///vk/vk.socket
|
||||
CELERY_BACKEND=redis+socket:///vk/vk.socket
|
|
@ -11,5 +11,5 @@ WORKDIR /app
|
|||
RUN bash -c '/venv/bin/pip install -r requirements.txt; \
|
||||
/venv/bin/python manage.py migrate ; \
|
||||
/venv/bin/python manage.py collectstatic --noinput'
|
||||
EXPOSE 8000
|
||||
CMD ["/venv/bin/python","manage.py","runserver","0.0.0.0:8000"]
|
||||
EXPOSE 4000
|
||||
CMD ["/venv/bin/gunicorn","wg_manager.wsgi","0.0.0.0:4000"]
|
9
Containerfile-worker
Normal file
9
Containerfile-worker
Normal file
|
@ -0,0 +1,9 @@
|
|||
FROM docker.io/python:slim
|
||||
RUN bash -c 'mkdir /app;\
|
||||
python -m venv /venv'
|
||||
COPY wg_manager /app/wg_manager
|
||||
COPY wg_connection_manager_worker /app/wg_connection_manager_worker
|
||||
COPY requirements-worker.txt .env /app
|
||||
WORKDIR /app
|
||||
RUN bash -c '/venv/bin/pip install -r requirements-worker.txt'
|
||||
CMD ["/venv/bin/celery","-A","wg_connection_manager_worker.tasks","worker","--loglevel=INFO"]
|
47
compose.yaml
Normal file
47
compose.yaml
Normal file
|
@ -0,0 +1,47 @@
|
|||
services:
|
||||
setup:
|
||||
image: docker.io/busybox
|
||||
volumes:
|
||||
- vk:/vk
|
||||
command: chmod 777 /vk
|
||||
vk:
|
||||
image: docker.io/valkey/valkey
|
||||
environment:
|
||||
- VALKEY_EXTRA_FLAGS=--unixsocket /vk/vk.socket --unixsocketperm 777
|
||||
volumes:
|
||||
- vk:/vk
|
||||
depends_on:
|
||||
- setup
|
||||
wg-manager-worker:
|
||||
image: wg-manager-worker
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Containerfile-worker
|
||||
environment:
|
||||
- CELERY_BROKER=redis+socket:///vk/vk.socket
|
||||
- CELERY_BACKEND=redis+socket:///vk/vk.socket
|
||||
depends_on:
|
||||
- vk
|
||||
network_mode: "host"
|
||||
cap_add:
|
||||
- NET_ADMIN
|
||||
volumes:
|
||||
- vk:/vk
|
||||
wg-manager:
|
||||
image: wg-manager
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Containerfile
|
||||
ports:
|
||||
- "4000:4000"
|
||||
depends_on:
|
||||
- vk
|
||||
volumes:
|
||||
- vk:/vk
|
||||
|
||||
networks:
|
||||
default:
|
||||
host:
|
||||
|
||||
volumes:
|
||||
vk:
|
4
requirements-worker.txt
Normal file
4
requirements-worker.txt
Normal file
|
@ -0,0 +1,4 @@
|
|||
celery
|
||||
pywireguard==2.*
|
||||
django-environ
|
||||
py-redis
|
|
@ -3,3 +3,4 @@ pywireguard==2.*
|
|||
whitenoise==6.7.0
|
||||
django-environ
|
||||
gunicorn
|
||||
py-redis
|
61
wg_connection_manager/dj_wg_manager_task.py
Normal file
61
wg_connection_manager/dj_wg_manager_task.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
from django.db import transaction
|
||||
from .models import UserConnection
|
||||
from pywireguard import Peer
|
||||
import environ
|
||||
from celery import Celery
|
||||
from .wg_manager import WGManager
|
||||
|
||||
|
||||
env = environ.Env(WG_INTERFACE=(str, "wg0"), CELERY_BROKER=(str, "redis://localhost"), CELERY_BACKEND=(str, "redis://localhost"))
|
||||
environ.Env.read_env(".env")
|
||||
WG_INTERFACE = env("WG_INTERFACE")
|
||||
CELERY_BROKER = env("CELERY_BROKER")
|
||||
CELERY_BACKEND = env("CELERY_BACKEND")
|
||||
|
||||
|
||||
class DJWGManager:
|
||||
wg_manager: WGManager
|
||||
app: Celery
|
||||
|
||||
def __init__(self):
|
||||
self.app = Celery("wg_manager_tasks", broker=CELERY_BROKER, backend=CELERY_BACKEND)
|
||||
|
||||
def sync(self):
|
||||
with transaction.atomic():
|
||||
UserConnection.objects.filter(active=True).update(active=False)
|
||||
res = self.app.send_task("wg_connection_manager_worker.tasks.get_peers")
|
||||
peers = res.get()
|
||||
for peer in peers:
|
||||
pk = peer.public_key.decode("ascii")
|
||||
psk = peer.preshared_key.decode("ascii")
|
||||
if not peer.allowed_ips:
|
||||
continue
|
||||
connection = UserConnection.objects.filter(public_key=pk)
|
||||
if len(connection) == 1:
|
||||
connection = connection[0]
|
||||
else:
|
||||
connection = UserConnection()
|
||||
connection.public_key = pk
|
||||
connection.preshared_key = psk
|
||||
connection.active = True
|
||||
connection.vpn_ip = peer.allowed_ips[0]
|
||||
connection.save()
|
||||
|
||||
def add_peer(self, user_connection: UserConnection):
|
||||
self.app.send_task(
|
||||
"wg_connection_manager_worker.tasks.add_peer",
|
||||
[Peer(public_key=user_connection.public_key, preshared_key=user_connection.preshared_key, allowed_ips=[user_connection.vpn_ip])],
|
||||
)
|
||||
self.sync()
|
||||
|
||||
def remove_peer(self, user_connection: UserConnection):
|
||||
pk = user_connection.public_key
|
||||
res = self.app.send_task("wg_connection_manager_worker.tasks.get_peers")
|
||||
peers = res.get()
|
||||
peer = list(filter(lambda x: x.public_key.decode("ascii") == pk, peers))
|
||||
if not peer:
|
||||
# TODO raise exception/ignore?
|
||||
return
|
||||
peer = peer[0]
|
||||
self.app.send_task("wg_connection_manager_worker.tasks.remove_peer", [peer])
|
||||
self.sync()
|
0
wg_connection_manager_worker/__init__.py
Normal file
0
wg_connection_manager_worker/__init__.py
Normal file
35
wg_connection_manager_worker/tasks.py
Normal file
35
wg_connection_manager_worker/tasks.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import environ
|
||||
from celery import Celery
|
||||
from pywireguard.factory import Peer
|
||||
from .wg_manager import WGManager
|
||||
|
||||
|
||||
env = environ.Env(WG_INTERFACE=(str, "wg0"), CELERY_BROKER=(str, "redis://localhost"), CELERY_BACKEND=(str, "redis://localhost"))
|
||||
environ.Env.read_env(".env")
|
||||
WG_INTERFACE = env("WG_INTERFACE")
|
||||
CELERY_BROKER = env("CELERY_BROKER")
|
||||
CELERY_BACKEND = env("CELERY_BACKEND")
|
||||
app = Celery("wg_manager_tasks", broker=CELERY_BROKER, backend=CELERY_BACKEND)
|
||||
|
||||
|
||||
@app.task
|
||||
def add_peer(peer: Peer):
|
||||
wg = WGManager(WG_INTERFACE)
|
||||
wg.add_peer(peer)
|
||||
|
||||
|
||||
@app.task
|
||||
def remove_peer(peer: Peer):
|
||||
wg = WGManager(WG_INTERFACE)
|
||||
wg.remove_peer(peer)
|
||||
|
||||
|
||||
@app.task
|
||||
def get_peers() -> list[Peer]:
|
||||
wg = WGManager(WG_INTERFACE)
|
||||
return wg.get_peers()
|
||||
|
||||
|
||||
@app.task
|
||||
def ping() -> str:
|
||||
return "pong"
|
20
wg_connection_manager_worker/wg_manager.py
Normal file
20
wg_connection_manager_worker/wg_manager.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
from pywireguard.factory import Interface, Peer
|
||||
|
||||
|
||||
class WGManager:
|
||||
interface: str
|
||||
|
||||
def __init__(self, interface: str):
|
||||
self.interface = interface
|
||||
|
||||
def get_peers(self) -> list[Peer]:
|
||||
wginterface = Interface(self.interface)
|
||||
return wginterface.peers
|
||||
|
||||
def add_peer(self, peer: Peer):
|
||||
wginterface = Interface(self.interface)
|
||||
wginterface.upsert_peer(peer)
|
||||
|
||||
def remove_peer(self, peer: Peer):
|
||||
wginterface = Interface(self.interface)
|
||||
wginterface.remove_peer(peer)
|
Loading…
Reference in a new issue