# https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# https://docs.scrapy.org/en/latest/topics/signals.html#item-signals
import json
import os
import tempfile
import warnings
import ijson
import jsonpointer
from flattentool import unflatten
from ocdsmerge.util import get_release_schema_url, get_tags
from scrapy.exceptions import DropItem, NotSupported
from kingfisher_scrapy.items import File, FileItem, PluckedItem
from kingfisher_scrapy.util import transcode
# https://docs.scrapy.org/en/latest/topics/item-pipeline.html#duplicates-filter
[docs]
class Validate:
"""
Drops duplicate files based on ``file_name`` and file items based on ``file_name`` and ``number``.
"""
def __init__(self):
self.files = set()
self.file_items = set()
[docs]
def process_item(self, item, spider):
if isinstance(item, FileItem):
key = (item.file_name, item.number)
if key in self.file_items:
raise DropItem(f'Duplicate FileItem: {key!r}')
else:
self.file_items.add(key)
elif isinstance(item, File):
key = item.file_name
if key in self.files:
raise DropItem(f'Duplicate File: {key!r}')
else:
self.files.add(key)
return item
[docs]
class Sample:
"""
Drops items and closes the spider once the sample size is reached.
"""
def __init__(self):
self.item_count = 0
[docs]
def process_item(self, item, spider):
if not spider.sample:
return item
# Drop FileError items, so that we keep trying to get data.
if not isinstance(item, (File, FileItem)):
raise DropItem('Item is not a File or FileItem')
if self.item_count >= spider.sample:
spider.crawler.engine.close_spider(spider, 'sample')
raise DropItem('Maximum sample size reached')
self.item_count += 1
return item
[docs]
def open_spider(self, spider):
if spider.sample:
spider.crawler.engine.downloader.total_concurrency = 1
[docs]
class Pluck:
"""
Extracts a value from the item and returns it as a plucked item.
"""
[docs]
def process_item(self, item, spider):
if not spider.pluck:
return item
value = None
if spider.pluck_package_pointer:
pointer = spider.pluck_package_pointer
if isinstance(item.data, dict):
value = _resolve_pointer(item.data, pointer)
else:
try:
value = next(transcode(spider, ijson.items, item.data, pointer[1:].replace('/', '.')))
except StopIteration:
value = f'error: {pointer} not found'
except ijson.common.IncompleteJSONError as e:
message = str(e).split('\n', 1)[0]
if message.endswith((
# Python backend.
'Incomplete JSON content',
# The JSON text can be truncated by a `bytes_received` handler.
'premature EOF',
# These messages occur if the JSON text is truncated at `"\\u` or `"\\`.
r"lexical error: invalid (non-hex) character occurs after '\u' inside string.",
r"lexical error: inside a string, '\' occurs before a character which it may not.",
)):
value = f'error: {pointer} not found within initial bytes'
else:
raise
else: # spider.pluck_release_pointer
if isinstance(item.data, dict):
data = item.data
else:
data = json.loads(item.data)
if item.data_type.startswith('release'):
releases = data['releases']
if releases:
value = max(_resolve_pointer(r, spider.pluck_release_pointer) for r in releases)
elif item.data_type.startswith('record'):
records = data['records']
if records:
# This assumes that the first record in the record package has the desired value.
record = records[0]
if 'releases' in record:
value = max(_resolve_pointer(r, spider.pluck_release_pointer) for r in record['releases'])
elif 'compiledRelease' in record:
value = _resolve_pointer(record['compiledRelease'], spider.pluck_release_pointer)
if value and spider.pluck_truncate:
value = value[:spider.pluck_truncate]
return PluckedItem(value=value)
[docs]
class Unflatten:
"""
Converts an item's data from CSV/XLSX to JSON, using the ``unflatten`` command from Flatten Tool.
"""
[docs]
def process_item(self, item, spider):
if not spider.unflatten or not isinstance(item, (File, FileItem)):
return item
input_name = item.file_name
if input_name.endswith('.csv'):
item.file_name = f'{item.file_name[:-4]}.json'
input_format = 'csv'
elif input_name.endswith('.xlsx'):
item.file_name = f'{item.file_name[:-5]}.json'
input_format = 'xlsx'
else:
extension = os.path.splitext(input_name)[1]
raise NotSupported(f"Unsupported extension '{extension}' of {input_name} from {item.url}")
spider_ocds_version = spider.ocds_version.replace('.', '__')
for tag in reversed(get_tags()):
if tag.startswith(spider_ocds_version):
schema = get_release_schema_url(tag)
break
else:
raise NotSupported(f"Unsupported version '{spider_ocds_version}' from {spider.ocds_version}")
with tempfile.TemporaryDirectory() as directory:
input_path = os.path.join(directory, input_name)
output_name = os.path.join(directory, item.file_name)
if input_format == 'csv':
input_name = directory
elif input_format == 'xlsx':
input_name = input_path
with open(input_path, 'wb') as f:
f.write(item.data)
with warnings.catch_warnings():
warnings.filterwarnings('ignore') # flattentool uses UserWarning, so we can't set a specific category
unflatten(
input_name,
root_list_path='releases',
root_id='ocid',
schema=schema,
input_format=input_format,
output_name=output_name,
**spider.unflatten_args
)
with open(output_name, 'rb') as f:
item.data = f.read()
return item
def _resolve_pointer(data, pointer):
try:
return jsonpointer.resolve_pointer(data, pointer)
except jsonpointer.JsonPointerException:
return f'error: {pointer} not found'