taskinity to lekki framework do definiowania i uruchamiania przepływów zadań za pomocą prostego języka DSL i dekoratorów Python. Jest znacznie prostszy niż Prefect, Airflow czy Luigi i działa natychmiast bez skomplikowanej konfiguracji.
taskinity jest idealny dla małych i średnich projektów. Dla dużych projektów z setkami zadań i skomplikowaną logiką, warto rozważyć bardziej zaawansowane narzędzia jak Airflow czy Prefect. Jednak nawet w dużych projektach taskinity może być używany do prototypowania i szybkiego testowania przepływów.
Tak, taskinity jest aktywnie rozwijany. Regularnie dodajemy nowe funkcje i ulepszenia, takie jak równoległe wykonanie, planowanie przepływów i przetwarzanie email.
taskinity nie wymaga skomplikowanej instalacji. Wystarczy sklonować repozytorium i zainstalować zależności:
# Klonowanie repozytorium
git clone https://github.com/taskinity/hubmail.git
cd hubmail/dsl
# Instalacja zależności
pip install -r requirements.txt
requirements.txt
Tak, taskinity działa na wszystkich głównych systemach operacyjnych: Windows, Linux i macOS.
Tak, taskinity może być łatwo uruchamiany w kontenerach Docker. W repozytorium znajduje się przykładowy plik Dockerfile.dashboard
i docker-compose-email.yml
.
Zadania definiuje się za pomocą dekoratora @task
:
from flow_dsl import task
@task(name="Pobieranie danych", description="Pobiera dane z API")
def fetch_data(url: str):
# Implementacja
return data
Przepływy definiuje się za pomocą prostego języka DSL:
flow_dsl = """
flow DataProcessing:
description: "Przetwarzanie danych"
fetch_data -> process_data
process_data -> save_results
"""
Tak, możesz łączyć jedno zadanie z wieloma zadaniami:
flow_dsl = """
flow DataProcessing:
description: "Przetwarzanie danych"
fetch_data -> process_data
fetch_data -> log_data
process_data -> save_results
"""
Obecnie taskinity nie obsługuje bezpośrednio warunków w definicji DSL. Możesz jednak implementować logikę warunkową wewnątrz zadań.
Przepływy uruchamia się za pomocą funkcji run_flow_from_dsl
:
from flow_dsl import run_flow_from_dsl
results = run_flow_from_dsl(flow_dsl, {"url": "https://example.com/data"})
Dane wejściowe przekazuje się jako drugi argument funkcji run_flow_from_dsl
:
results = run_flow_from_dsl(flow_dsl, {
"url": "https://example.com/data",
"api_key": "your_api_key"
})
Wyniki przepływu są zwracane przez funkcję run_flow_from_dsl
:
results = run_flow_from_dsl(flow_dsl, input_data)
print(results)
Tak, możesz wczytać definicję DSL z pliku:
from flow_dsl import load_dsl, run_flow_from_dsl
dsl_content = load_dsl("dsl_definitions/my_flow.dsl")
results = run_flow_from_dsl(dsl_content, input_data)
taskinity zawiera narzędzia do wizualizacji przepływów:
from flow_visualizer import visualize_dsl
visualize_dsl(flow_dsl, output_file="flow_diagram.png")
taskinity obsługuje wizualizację w formatach PNG, SVG i Mermaid.
Tak, możesz wizualizować historię wykonania przepływu:
from flow_visualizer import visualize_flow
visualize_flow(flow_id, output_file="execution_diagram.png")
Dashboard automatycznie wizualizuje przepływy. Wystarczy uruchomić dashboard i przejść do zakładki “Wizualizacja”.
taskinity zawiera dwa dashboardy:
python mini_dashboard.py
python simple_dashboard.py
Mini Dashboard działa domyślnie na porcie 8765, a Pełny Dashboard na porcie 8000.
Tak, możesz zmienić port dashboardu, edytując odpowiedni plik:
# mini_dashboard.py
app.run(host="0.0.0.0", port=8765) # Zmień port na wybrany
Powiadomienia email konfiguruje się w pliku config/notification_config.json
:
{
"enabled": true,
"email": {
"enabled": true,
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"username": "user@example.com",
"password": "password123",
"from_email": "taskinity@example.com",
"recipients": ["admin@example.com"]
}
}
Powiadomienia Slack konfiguruje się w pliku config/notification_config.json
:
{
"enabled": true,
"slack": {
"enabled": true,
"webhook_url": "https://hooks.slack.com/services/XXX/YYY/ZZZ",
"channel": "#flow-notifications",
"username": "taskinity Bot"
}
}
Powiadomienia są wysyłane w zależności od konfiguracji w sekcji notification_rules
:
{
"notification_rules": {
"on_start": true,
"on_complete": true,
"on_error": true,
"include_details": true
}
}
Możesz wysłać powiadomienie ręcznie za pomocą funkcji send_email_notification
i send_slack_notification
:
from notification_service import send_email_notification, send_slack_notification
send_email_notification("Temat", "Treść wiadomości")
send_slack_notification("Tytuł", "Treść wiadomości")
Przepływy z równoległym wykonaniem uruchamia się za pomocą funkcji run_parallel_flow_from_dsl
:
from parallel_executor import run_parallel_flow_from_dsl
results = run_parallel_flow_from_dsl(flow_dsl, input_data)
Równoległe wykonanie ma następujące ograniczenia:
Możesz kontrolować liczbę równoległych zadań za pomocą parametru max_workers
:
results = run_parallel_flow_from_dsl(flow_dsl, input_data, max_workers=4)
Równoległe wykonanie jest szybsze dla przepływów z niezależnymi zadaniami, które mogą być wykonywane jednocześnie. Dla przepływów sekwencyjnych nie ma różnicy w wydajności.
Przepływy planuje się za pomocą modułu flow_scheduler
:
from flow_scheduler import Scheduler
scheduler = Scheduler()
scheduler.add_schedule("dsl_definitions/my_flow.dsl", interval=60)
scheduler.start()
taskinity obsługuje następujące typy harmonogramów:
Możesz zarządzać harmonogramami za pomocą linii poleceń:
# Uruchomienie planera
python flow_scheduler.py start
# Utworzenie harmonogramu (co 60 minut)
python flow_scheduler.py create dsl_definitions/my_flow.dsl 60
# Lista harmonogramów
python flow_scheduler.py list
# Ręczne uruchomienie harmonogramu
python flow_scheduler.py run [schedule_id]
# Usunięcie harmonogramu
python flow_scheduler.py delete [schedule_id]
Tak, możesz przekazać dane wejściowe do zaplanowanego przepływu:
scheduler.add_schedule(
"dsl_definitions/my_flow.dsl",
interval=60,
input_data={"url": "https://example.com/data"}
)
Przetwarzanie email konfiguruje się w pliku config/email_config.json
:
{
"imap": {
"server": "imap.example.com",
"port": 993,
"username": "user@example.com",
"password": "password123",
"folder": "INBOX",
"ssl": true
},
"smtp": {
"server": "smtp.example.com",
"port": 587,
"username": "user@example.com",
"password": "password123",
"from_email": "user@example.com",
"use_tls": true
}
}
Przetwarzanie email uruchamia się za pomocą modułu email_pipeline
:
from email_pipeline import EmailProcessor
processor = EmailProcessor()
processor.process_emails()
Automatyczne odpowiedzi konfiguruje się w sekcji auto_reply
pliku config/email_config.json
:
{
"auto_reply": {
"enabled": true,
"criteria": {
"subject_contains": ["pytanie", "zapytanie", "pomoc", "wsparcie"],
"from_domains": ["example.com", "gmail.com"],
"priority_keywords": ["pilne", "ważne", "urgent", "asap"]
},
"templates": {
"default": "Dziękujemy za wiadomość. Odpowiemy najszybciej jak to możliwe.",
"priority": "Dziękujemy za pilną wiadomość. Zajmiemy się nią priorytetowo.",
"support": "Dziękujemy za zgłoszenie. Nasz zespół wsparcia skontaktuje się z Tobą wkrótce."
}
}
}
Uruchamianie przepływów na podstawie emaili konfiguruje się w sekcji flows
pliku config/email_config.json
:
{
"flows": {
"trigger_flow_on_email": true,
"flow_mapping": {
"support": "support_flow.dsl",
"order": "order_processing.dsl",
"complaint": "complaint_handling.dsl"
}
}
}
Sprawdź składnię DSL, zwracając uwagę na:
Sprawdź:
Sprawdź:
Logi znajdują się w katalogu logs/
:
flow_dsl.log
- logi głównego modułumini_dashboard.log
- logi mini dashboardusimple_dashboard.log
- logi pełnego dashboardunotification_service.log
- logi serwisu powiadomieńparallel_executor.log
- logi równoległego wykonawcyflow_scheduler.log
- logi planeraemail_pipeline.log
- logi procesora email