Skip to content

Commit

Permalink
Merge pull request #135 from IFRCGo/feature/global_flood_database
Browse files Browse the repository at this point in the history
GFD Extraction Transformation.
  • Loading branch information
ranjan-stha authored Feb 14, 2025
2 parents b57cff1 + 15173a6 commit b3cd08f
Show file tree
Hide file tree
Showing 18 changed files with 530 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ env:
EMDAT_AUTHORIZATION_KEY: dummy-value
IDMC_CLIENT_ID: dummy-value
EOAPI_DOMAIN: https://montandon-eoapi.dummy.com
GFD_CREDENTIAL: dummy-value
GFD_SERVICE_ACCOUNT: dummy-value


jobs:
pre_commit_checks:
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
```bash
docker-compose exec web python manage.py extract_emdat_data
```
- Command to import IDU data.
```bash
docker-compose exec web python manage.py extract_idu_data
```
- Command to import GIDD data.
```bash
docker-compose exec web python manage.py extract_gidd_data
```
- Command to import Global Flood Database data.
```bash
docker-compose exec web python manage.py extract_gfd_data
```
- To view the imported data in the admin panel you need to create yourself as a superuser:
```bash
docker-compose exec web python manage.py createsuperuser
Expand Down
25 changes: 25 additions & 0 deletions apps/etl/etl_tasks/gfd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import datetime, timedelta

from celery import chain, shared_task

from apps.etl.extraction.sources.gfd.extract import GFDExtraction
from apps.etl.transform.sources.gfd import GFDTransformHandler


@shared_task
def ext_and_transform_gfd_historical_data():
chain(
GFDExtraction.task.s(),
GFDTransformHandler.task.s(),
).apply_async()


@shared_task
def ext_and_transform_gfd_latest_data():
end_date = datetime.now().date()
start_date = end_date - timedelta(days=1)

chain(
GFDExtraction.task.s(start_date, end_date),
GFDTransformHandler.task.s(),
).apply_async()
Empty file.
180 changes: 180 additions & 0 deletions apps/etl/extraction/sources/gfd/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import base64
import hashlib
import json
import logging
import tempfile
from typing import Any, Callable

import ee
import requests
from django.conf import settings

from apps.etl.extraction.sources.base.handler import BaseExtraction
from apps.etl.extraction.sources.base.utils import manage_duplicate_file_content
from apps.etl.models import ExtractionData
from main.celery import app

logger = logging.getLogger(__name__)


DATA_URL = "https://earthengine.googleapis.com/v1alpha/projects/earthengine-legacy/assets/GLOBAL_FLOOD_DB/MODIS_EVENTS/V1"


class GFDExtraction(BaseExtraction):

@classmethod
def decode_json(cls, encoded_str):
"""Decodes a Base64 string back to a JSON object."""
decoded_data = base64.urlsafe_b64decode(encoded_str.encode()).decode()
return json.loads(decoded_data)

@classmethod
def get_json_credentials(cls, content):
with tempfile.NamedTemporaryFile(delete=False, mode="w") as temp_file:
json_string = json.dumps(content, sort_keys=True)
temp_file.write(json_string)
temp_path = temp_file.name
return temp_path

@classmethod
def hash_json_content(cls, json_data):
"""Hashes a JSON object using SHA256."""
json_string = json.dumps(json_data, sort_keys=True)
return hashlib.sha256(json_string.encode()).hexdigest()

@classmethod
def store_extraction_data(
cls,
validate_source_func: Callable[[Any], None],
source: int,
response: dict,
instance_id: int = None,
):
"""
Save extracted data into database. Checks for duplicate content using hashing.
"""
file_extension = "json"
file_name = f"{source}.{file_extension}"
resp_data_content = json.dumps(response)

# save the additional response data after the data is fetched from api.
extraction_instance = ExtractionData.objects.get(id=instance_id)
extraction_instance.resp_data_type = "application/json"
extraction_instance.save(update_fields=["resp_data_type"])

# Validate the non empty response data.
if resp_data_content:
# Source validation
if validate_source_func:
extraction_instance.source_validation_status = validate_source_func(resp_data_content)["status"]
extraction_instance.content_validation = validate_source_func(resp_data_content)["validation_error"]

# manage duplicate file content.
hash_content = cls.hash_json_content(resp_data_content)
manage_duplicate_file_content(
source=extraction_instance.source,
hash_content=hash_content,
instance=extraction_instance,
response_data=resp_data_content,
file_name=file_name,
)
return extraction_instance

@classmethod
def _save_response_data(cls, instance: ExtractionData, response: requests.Response) -> dict:
"""
Save the response data to the extraction instance.
Args:
instance: ExtractionData instance to save to
response: Response object containing the data
Returns:
dict: Parsed JSON response content
"""
instance = cls.store_extraction_data(
response=response,
source=instance.source,
validate_source_func=None,
instance_id=instance.id,
)

return response

@classmethod
def get_flood_data(cls, collection, batch_size=1000):
"""Retrieve flood metadata in batches to avoid memory issues."""
total_size = collection.size().getInfo()

all_data = []
for i in range(0, total_size, batch_size):
batch = collection.toList(batch_size, i).getInfo()
all_data.extend([feature for feature in batch])

return all_data

@classmethod
def handle_extraction(cls, url: str, source: int, start_date, end_date) -> int:
"""
Process data extraction.
Returns:
int: ID of the extraction instance
"""
logger.info("Starting data extraction")
instance = cls._create_extraction_instance(url=url, source=source)

try:
cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS)
response = cls.extract_data(start_date, end_date)
response_data = cls._save_response_data(instance, response)
# Check if response contains data
if response_data:
cls._update_instance_status(instance, ExtractionData.Status.SUCCESS)
logger.info("Data extracted successfully")
else:
cls._update_instance_status(
instance,
ExtractionData.Status.SUCCESS,
ExtractionData.ValidationStatus.NO_DATA,
update_validation=True,
)
logger.warning("No hazard data found in response")

return instance.id

except requests.exceptions.RequestException:
cls._update_instance_status(instance, ExtractionData.Status.FAILED)
logger.error(
"extraction failed",
exc_info=True,
extra={
"source": instance.source,
},
)
raise

@classmethod
def extract_data(cls, start_date=None, end_date=None):
# Set up authentication
service_account = settings.GFD_SERVICE_ACCOUNT

# # Decode the earthengine credential
decoded_json = cls.decode_json(settings.GFD_CREDENTIAL)
credential_file_path = cls.get_json_credentials(decoded_json)

# Authenticate
credentials = ee.ServiceAccountCredentials(service_account, credential_file_path)
ee.Initialize(credentials)

# Load Global Flood Database (GFD)
gfd_data = ee.ImageCollection("GLOBAL_FLOOD_DB/MODIS_EVENTS/V1")

# Filter flood events by date
if start_date and end_date:
gfd_data = gfd_data.filterDate(str(start_date), str(end_date))

flood_data = cls.get_flood_data(gfd_data, batch_size=500)
return flood_data

@staticmethod
@app.task
def task(start_date=None, end_date=None):
return GFDExtraction().handle_extraction(DATA_URL, ExtractionData.Source.GIDD, start_date, end_date)
1 change: 0 additions & 1 deletion apps/etl/extraction/sources/idu/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def handle_extraction(url) -> dict:
int: ID of the extraction instance
"""
logger.info("Starting IDU data extraction")
print("Starting IDU data extraction")
instance = IDUExtraction._create_extraction_instance(url=url)

try:
Expand Down
10 changes: 10 additions & 0 deletions apps/etl/management/commands/extract_gfd_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from django.core.management.base import BaseCommand

from apps.etl.etl_tasks.gfd import ext_and_transform_gfd_historical_data


class Command(BaseCommand):
help = "Import data from gfd api"

def handle(self, *args, **options):
ext_and_transform_gfd_historical_data()
8 changes: 8 additions & 0 deletions apps/etl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
fetch_event_data,
fetch_gdacs_geometry_data,
)
from apps.etl.extraction.sources.gfd.extract import GFDExtraction
from apps.etl.extraction.sources.gidd.extract import GIDDExtraction
from apps.etl.extraction.sources.glide.extract import ( # noqa: F401
import_hazard_data as import_glide_data,
Expand All @@ -20,13 +21,15 @@
transform_geo_data,
transform_impact_data,
)
from apps.etl.transform.sources.gfd import GFDTransformHandler # noqa: F401
from apps.etl.transform.sources.gidd import GIDDTransformHandler # noqa: F401
from apps.etl.transform.sources.glide import transform_glide_event_data # noqa: F401
from apps.etl.transform.sources.handler import BaseTransformerHandler
from apps.etl.transform.sources.idu import IDUTransformHandler # noqa: F401

IDUExtraction.handle_extraction
GIDDExtraction.handle_extraction
GFDExtraction.handle_extraction

BaseTransformerHandler.handle_transformation

Expand Down Expand Up @@ -56,6 +59,11 @@ def extract_gidd_data():
call_command("extract_gidd_data")


@shared_task
def extract_gfd_data():
call_command("extract_gfd_data")


@shared_task
def load_data():
call_command("load_data_to_stac")
Expand Down
22 changes: 22 additions & 0 deletions apps/etl/transform/sources/gfd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pystac_monty.sources.gfd import GFDDataSource, GFDTransformer

from apps.etl.models import ExtractionData
from apps.etl.transform.sources.handler import BaseTransformerHandler
from main.celery import app


class GFDTransformHandler(BaseTransformerHandler):
transformer = GFDTransformer
transformer_schema = GFDDataSource

@classmethod
def get_schema_data(cls, extraction_obj: ExtractionData):
with extraction_obj.resp_data.open() as file_data:
data = file_data.read()

return cls.transformer_schema(source_url=extraction_obj.url, data=data)

@staticmethod
@app.task
def task(extraction_id):
return GFDTransformHandler().handle_transformation(extraction_id)
3 changes: 3 additions & 0 deletions apps/etl/transform/sources/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
"idu-impacts": PyStacLoadData.ItemType.IMPACT,
"idmc-events": PyStacLoadData.ItemType.EVENT,
"idmc-gidd-impacts": PyStacLoadData.ItemType.IMPACT,
"gfd-events": PyStacLoadData.ItemType.EVENT,
"gfd-impacts": PyStacLoadData.ItemType.IMPACT,
"gfd-hazards": PyStacLoadData.ItemType.HAZARD,
}


Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ x-server: &base_server_setup
# ETL Sources
EMDAT_AUTHORIZATION_KEY: ${EMDAT_AUTHORIZATION_KEY?error}
IDMC_CLIENT_ID: ${IDMC_CLIENT_ID?error}
GFD_CREDENTIAL: ${GFD_CREDENTIAL?error}
GFD_SERVICE_ACCOUNT: ${GFD_SERVICE_ACCOUNT?error}
# ETL Load
EOAPI_DOMAIN: ${EOAPI_DOMAIN?error}
volumes:
Expand Down
2 changes: 2 additions & 0 deletions helm/templates/config/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ stringData:
# ETL Sources
EMDAT_AUTHORIZATION_KEY: {{ required "secrets.EMDAT_AUTHORIZATION_KEY" .Values.secrets.EMDAT_AUTHORIZATION_KEY | quote }}
IDMC_CLIENT_ID: {{ required "secrets.IDMC_CLIENT_ID" .Values.secrets.IDMC_CLIENT_ID | quote }}
GFD_CREDENTIAL: {{ required "secrets.GFD_CREDENTIAL" .Values.secrets.GFD_CREDENTIAL | quote }}
GFD_SERVICE_ACCOUNT: {{ required "secrets.GFD_SERVICE_ACCOUNT" .Values.secrets.GFD_SERVICE_ACCOUNT | quote }}

# Database
{{- if .Values.postgresql.enabled }}
Expand Down
2 changes: 2 additions & 0 deletions helm/values-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ secrets:
# Sources
EMDAT_AUTHORIZATION_KEY: dummy-key
IDMC_CLIENT_ID: dummy-client-id
GFD_CREDENTIAL: dummy-gfd-cred
GFD_SERVICE_ACCOUNT: dummy-gfd-service-ac
secretsAdditional:
ENABLE_MAGIC_SECRET: "true"
MAGIC_KEY: to-much-fun
Expand Down
2 changes: 2 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ secrets:
# ETL Sources
EMDAT_AUTHORIZATION_KEY:
IDMC_CLIENT_ID:
GFD_CREDENDIAL:
GFD_SERVICE_ACCOUNT:
# NOTE: Used to pass additional secrets to api/worker containers
# NOTE: Not used by azure vault
secretsAdditional:
Expand Down
10 changes: 10 additions & 0 deletions main/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@
IDMC_DATA_URL=(str, "https://helix-tools-api.idmcdb.org"),
# ETL Load configs
EOAPI_DOMAIN=str, # http://montandon-eoapi.ifrc.org
GFD_CREDENTIAL=str,
GFD_SERVICE_ACCOUNT=str,
)

GFD_SERVICE_ACCOUNT = env("GFD_SERVICE_ACCOUNT")

GFD_CREDENTIAL = env("GFD_CREDENTIAL")

EMDAT_AUTHORIZATION_KEY = env("EMDAT_AUTHORIZATION_KEY")

IDMC_CLIENT_ID = env("IDMC_CLIENT_ID")
Expand Down Expand Up @@ -328,6 +334,10 @@
"task": "apps.etl.tasks.extract_usgs_data",
"schedule": crontab(minute=0, hour=0), # This task execute daily at 12 AM (UTC)
},
"import_gfd_data": {
"task": "apps.etl.etl_tasks.ext_and_transform_gfd_latest_data",
"schedule": crontab(minute=0, hour=0), # This task execute daily at 12 AM (UTC)
},
"load_data_to_stac": {
"task": "apps.etl.tasks.load_data",
"schedule": crontab(minute=0, hour=0), # TODO Set time to run this job
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"sentry-sdk",
"ipython",
"uwsgi",
"earthengine-api>=1.5.1",
]

[tool.uv.sources]
Expand Down
Loading

0 comments on commit b3cd08f

Please sign in to comment.