Skip to content

Commit

Permalink
refactor desy harvest
Browse files Browse the repository at this point in the history
  • Loading branch information
MJedr committed Sep 13, 2023
1 parent e645cc3 commit 86b34a5
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 52 deletions.
56 changes: 34 additions & 22 deletions hepcrawl/spiders/desy_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def connect_to_s3(self):
return connections

def s3_url(self, s3_file=None, expire=86400):
return self.s3_url_for_file(s3_file.key, bucket=s3_file.bucket_name, expire=expire)
return self.s3_url_for_file(s3_file, expire=expire)

def s3_url_for_file(self, file_name, bucket=None, expire=7776000):
bucket = bucket or self.s3_input_bucket
Expand All @@ -132,28 +132,30 @@ def s3_url_for_file(self, file_name, bucket=None, expire=7776000):
ExpiresIn=expire
)

def crawl_s3_bucket(self):
input_bucket = self.s3_resource.Bucket(self.s3_input_bucket)
for s3_file in input_bucket.objects.all():
if not s3_file.key.endswith('.jsonl'):
# this is a document referenced in an jsonl file, it will be
# processed when dealing with attached documents
continue

self.logger.info("Remote: Try to crawl file from s3: {file}".format(file=s3_file.key))
def crawl(self):
objects_metadata = self.s3_resource.meta.client.list_objects_v2(
Bucket=self.s3_input_bucket, Delimiter='/'
)
for subdir in objects_metadata.get('CommonPrefixes'):
subdir_name = subdir.get('Prefix')
try:
self.s3_resource.Object(self.s3_output_bucket, s3_file.key).load()
self.logger.info("File %s was already processed!", s3_file.key)
except ClientError: # Process it only if file is not in output_bucket
self.s3_resource.Object(self.s3_output_bucket, subdir_name).load()
except ClientError:
jsonl_file_name = "{}.jsonl".format(subdir_name.strip('/'))
jsonl_s3_path = "{}{}".format(subdir_name, jsonl_file_name)
yield Request(
self.s3_url(s3_file),
meta={"s3_file": s3_file.key},
self.s3_url(jsonl_s3_path),
meta={"s3_file": jsonl_file_name, "s3_subdirectory": subdir_name},
callback=self.parse
)

def handle_not_found_jsonl(self, response):
self.logger.error("jsonl file %s was not found in subdirectory!", response.meta['s3_file'])
return

def start_requests(self):
"""List selected bucket on s3 and yield files."""
requests = self.crawl_s3_bucket()
requests = self.crawl()

for request in requests:
yield request
Expand All @@ -165,9 +167,21 @@ def _is_local_path(cls, url):

def _get_full_uri(self, file_name, schema='https'):
file_name = file_name.lstrip('/api/files/')
self.move_file_to_processed(file_name)
url = self.s3_url_for_file(file_name, bucket=self.s3_output_bucket)
return url

def move_all_files_for_subdirectory(self, prefix):
import pdb
pdb.set_trace()
delimiter = '/'

files_to_move = self.s3_resource.meta.client.list_objects_v2(
Bucket=self.s3_input_bucket, Prefix=prefix, Delimiter=delimiter
)

for file in files_to_move['Contents']:
file_key = file['Key']
self.move_file_to_processed(file_key)

def parse(self, response):
"""Parse a ``Desy`` jsonl file into a :class:`hepcrawl.utils.ParsedItem`.
Expand Down Expand Up @@ -204,6 +218,9 @@ def parse(self, response):
file_name=file_name
)

self.move_all_files_for_subdirectory(
prefix=response.meta['s3_subdirectory']
)
# make sure we don't yield control to scrapy while an app context is
# running to ensure different contexts don't get interleaved which
# Flask doesn't handle correctly (single global app context stack)
Expand All @@ -213,10 +230,6 @@ def parse(self, response):

self.logger.info('Processed all JSON records in %s', file_name)

if "s3_file" in response.meta:
s3_file = response.meta['s3_file']
self.move_file_to_processed(file_name=s3_file)


def move_file_to_processed(self, file_name, file_bucket=None, output_bucket=None):
file_bucket = file_bucket or self.s3_input_bucket
Expand Down Expand Up @@ -256,7 +269,6 @@ def _parsed_items_from_json(

if new_documents:
parsed_item.record['documents'] = new_documents

parsed_item.file_urls = files_to_download
self.logger.info('Got the following attached documents to download: %s', files_to_download)
self.logger.info('Got item: %s', parsed_item)
Expand Down
5 changes: 2 additions & 3 deletions hepcrawl/testlib/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from hepcrawl.settings import CRAWL_ONCE_PATH


def fake_response_from_file(file_name, test_suite='unit', url='http://www.example.com', response_type=TextResponse):
def fake_response_from_file(file_name, test_suite='unit', url='http://www.example.com', response_type=TextResponse, response_meta={}):
"""Create a Scrapy fake HTTP response from a HTML file
Args:
Expand All @@ -34,7 +34,7 @@ def fake_response_from_file(file_name, test_suite='unit', url='http://www.exampl
Returns:
``response_type``: A scrapy HTTP response which can be used for unit testing.
"""
request = Request(url=url)
request = Request(url=url, meta=response_meta)

if not file_name[0] == '/':
file_path = get_test_suite_path(
Expand All @@ -53,7 +53,6 @@ def fake_response_from_file(file_name, test_suite='unit', url='http://www.exampl
body=file_content,
**{'encoding': 'utf-8'}
)

return response


Expand Down
1 change: 0 additions & 1 deletion tests/Dockerfile.hepcrawl_py3
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ ENV PATH="/home/test/.local/bin:${PATH}"
WORKDIR /code
RUN pip install --upgrade wheel setuptools idutils rfc3987 bleach jsonschema inspire-utils
RUN pip install --no-cache-dir -e .[all]

6 changes: 5 additions & 1 deletion tests/functional/desy/test_desy.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,13 @@ def setup_s3_files(s3_key, s3_secret, s3_server, buckets=[], files_to_upload=[],
)
transfer_config = TransferConfig(use_threads=False)
for bucket_name, file_name in files_to_upload:
file_root = file_name.split('.jsonl')[0]
buckets_map[bucket_name].upload_file(
Filename=os.path.join(test_files_path, file_name),
Key=file_name, Config=transfer_config
Key='{subdirectory}/{filename}'.format(
subdirectory=file_root,
filename=file_name
), Config=transfer_config
)


Expand Down
55 changes: 30 additions & 25 deletions tests/unit/test_desy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import os

import mock
import pytest
from deepdiff import DeepDiff
from mock import MagicMock
Expand All @@ -21,6 +22,7 @@
from hepcrawl import settings
from hepcrawl.pipelines import InspireCeleryPushPipeline
from hepcrawl.spiders import desy_spider
from hepcrawl.spiders.desy_spider import DesySpider
from hepcrawl.testlib.fixtures import (
expected_json_results_from_file,
fake_response_from_file,
Expand All @@ -47,24 +49,24 @@ def get_records(response_file_name):
# environmental variables needed for the pipelines payload
os.environ['SCRAPY_JOB'] = 'scrapy_job'
os.environ['SCRAPY_FEED_URI'] = 'scrapy_feed_uri'

spider = create_spider()
records = spider.parse(
fake_response_from_file(
file_name='desy/' + response_file_name,
response_type=TextResponse
with mock.patch('hepcrawl.spiders.desy_spider.DesySpider.move_all_files_for_subdirectory'):
spider = create_spider()
records = list(spider.parse(
fake_response_from_file(
file_name='desy/' + response_file_name,
response_type=TextResponse,
response_meta={"s3_subdirectory": response_file_name.strip('.jsonl') + '/'}
)
))
pipeline = InspireCeleryPushPipeline()
pipeline.open_spider(spider)

return (
pipeline.process_item(
record,
spider
)['record'] for record in records
)
)

pipeline = InspireCeleryPushPipeline()
pipeline.open_spider(spider)

return (
pipeline.process_item(
record,
spider
)['record'] for record in records
)


def get_one_record(response_file_name):
Expand Down Expand Up @@ -116,11 +118,14 @@ def test_pipeline(generated_records, expected_records):


def test_invalid_jsonll():
spider = create_spider()
response = MagicMock()
response.url = "https://s3.cern.ch/incoming-bucket/invalid_record.jsonl"
response.text = "This is not actually JSONL\n"
result = list(spider.parse(response))
assert result[0].exception.startswith('ValueError')
assert result[0].traceback is not None
assert result[0].source_data == "This is not actually JSONL"
with mock.patch('hepcrawl.spiders.desy_spider.DesySpider.move_all_files_for_subdirectory'):
spider = create_spider()
response = MagicMock()
response.url = "https://s3.cern.ch/incoming-bucket/invalid_record.jsonl"
response.text = "This is not actually JSONL\n"
response.meta = {"s3_subdirectory": 'invalid_record'}

result = list(spider.parse(response))
assert result[0].exception.startswith('ValueError')
assert result[0].traceback is not None
assert result[0].source_data == "This is not actually JSONL"

0 comments on commit 86b34a5

Please sign in to comment.