Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

FIXES TO RUN IT LOCALLY #1

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions dags/happy_flow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

from airflow.decorators import dag, task
from airflow.sensors.sql import SqlSensor
from hooks.inspirehep.inspire_http_hook import InspireHttpHook


@dag(start_date=datetime.datetime(2021, 1, 1), schedule_interval=None)
def happy_flow_dag():
inspire_http_hook = InspireHttpHook()

@task
def fetch_document(filename: str) -> dict:
from include.utils import get_s3_client
from include.utils.utils import get_s3_client

s3_client = get_s3_client()
s3_client.download_file("inspire-incoming", filename, f"./{filename}")
Expand All @@ -19,13 +22,14 @@ def fetch_document(filename: str) -> dict:

@task()
def normalize_affiliations(data):
from hooks.inspire_connection_hook import call_inspire_api_with_hook
from include.inspire.affiliations_normalization import \
assign_normalized_affiliations

endpoint = "/curation/literature/affiliations-normalization"
request_data = {"authors": data["authors"], "workflow_id": 1}
result = call_inspire_api_with_hook(endpoint=endpoint, data=request_data)
result = inspire_http_hook.call_api(
endpoint=endpoint, data=request_data, method="GET"
)
data = assign_normalized_affiliations(result.json(), data=data)
return data

Expand Down
8 changes: 5 additions & 3 deletions dags/process_until_breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow.decorators import dag, task
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule
from hooks.inspirehep.inspire_http_hook import InspireHttpHook


@dag(
Expand All @@ -16,10 +17,12 @@ def check_approval(**context):
if context["params"]["approved"]:
return False
return True
inspire_http_hook = InspireHttpHook()


@task
def fetch_document(filename: str) -> dict:
from include.utils import get_s3_client
from include.utils.utils import get_s3_client

s3_client = get_s3_client()
s3_client.download_file("inspire-incoming", filename, f"./{filename}")
Expand All @@ -29,13 +32,12 @@ def fetch_document(filename: str) -> dict:

@task()
def normalize_affiliations(data):
from hooks.inspire_connection_hook import call_inspire_api_with_hook
from include.inspire.affiliations_normalization import \
assign_normalized_affiliations

endpoint = "/curation/literature/affiliations-normalization"
request_data = {"authors": data["authors"], "workflow_id": 1}
result = call_inspire_api_with_hook(endpoint=endpoint, data=request_data)
result = inspire_http_hook(endpoint=endpoint, data=request_data, method="GET")
data = assign_normalized_affiliations(result.json(), data=data)
return data

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ x-airflow-common:
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
# S3 config
S3_USER: airflow
Expand Down
6 changes: 3 additions & 3 deletions plugins/hooks/backoffice/workflow_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ def set_workflow_status(self, status_name: str, workflow_id: str) -> Response:
)

def get_workflow(self, workflow_id: str) -> dict:
endpoint = f"/workflows/{workflow_id}"
endpoint = f"/api/workflows/{workflow_id}"
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint
)
response = self.run(endpoint=endpoint, headers=self.headers)
return response.json()

def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
endpoint = f"/workflows/{workflow_id}/"
endpoint = f"/api/workflows/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PUT",
Expand All @@ -48,7 +48,7 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
def partial_update_workflow(
self, workflow_id: str, workflow_partial_update_data: dict
) -> Response:
endpoint = f"/workflow-update/{workflow_id}/"
endpoint = f"/api/workflow-update/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PATCH",
Expand Down
4 changes: 2 additions & 2 deletions plugins/hooks/backoffice/workflow_ticket_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(
self.endpoint = "/workflow-ticket/"

def get_ticket(self, workflow_id: str, ticket_type: str) -> dict:
endpoint = f"/workflow-ticket/{workflow_id}/"
endpoint = f"/api/workflow-ticket/{workflow_id}/"
params = {"ticket_type": ticket_type}
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
Expand All @@ -35,7 +35,7 @@ def get_ticket(self, workflow_id: str, ticket_type: str) -> dict:
def create_ticket_entry(
self, workflow_id: str, ticket_id: str, ticket_type: str
) -> Response:
endpoint = f"/workflow-ticket/"
endpoint = f"/api/workflow-ticket/"
data = {
"ticket_type": ticket_type,
"ticket_id": ticket_id,
Expand Down
File renamed without changes.