import copy
import json
from zipfile import BadZipFile
import ijson
from scrapy.exceptions import DropItem
from scrapy.utils.log import logformatter_adapter
from kingfisher_scrapy import util
from kingfisher_scrapy.exceptions import RetryableError
from kingfisher_scrapy.items import File, FileItem
MAX_GROUP_SIZE = 100
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
[docs]
def sample_filled(spider, number):
return spider.sample and number > spider.sample
[docs]
def group_size(spider):
if spider.sample:
return min(spider.sample, MAX_GROUP_SIZE)
return MAX_GROUP_SIZE
[docs]
def read_data_from_file_if_any(item):
if hasattr(item.data, 'read'):
content = item.data.read()
item.data.close()
item.data = content
[docs]
class ConcatenatedJSONMiddleware:
"""
If the spider's ``concatenated_json`` class attribute is ``True``, yields each object of the File as a FileItem.
Otherwise, yields the original item.
"""
[docs]
async def process_spider_output(self, response, result, spider):
"""
:returns: a generator of FileItem objects, in which the ``data`` field is parsed JSON
"""
async for item in result:
if not isinstance(item, File) or not spider.concatenated_json:
yield item
continue
data = item.data
# ijson can read from bytes or a file-like object.
for number, obj in enumerate(util.transcode(spider, ijson.items, data, '', multiple_values=True), 1):
if sample_filled(spider, number):
return
yield spider.build_file_item(number, obj, item)
[docs]
class LineDelimitedMiddleware:
"""
If the spider's ``line_delimited`` class attribute is ``True``, yields each line of the File as a FileItem.
Otherwise, yields the original item.
"""
[docs]
async def process_spider_output(self, response, result, spider):
"""
:returns: a generator of FileItem objects, in which the ``data`` field is bytes
"""
async for item in result:
if not isinstance(item, File) or not spider.line_delimited:
yield item
continue
data = item.data
# Data can be bytes or a file-like object. If bytes, split into an iterable.
if isinstance(data, bytes):
data = data.splitlines(keepends=True)
for number, line in enumerate(data, 1):
if sample_filled(spider, number):
return
yield spider.build_file_item(number, line, item)
[docs]
class ValidateJSONMiddleware:
"""
If the spider's ``validate_json`` class attribute is ``True``, checks if the item's ``data`` field is valid
JSON. If not, yields nothing. Otherwise, yields the original item.
"""
[docs]
async def process_spider_output(self, response, result, spider):
"""
:returns: a generator of File or FileItem objects, in which the ``data`` field is valid JSON
"""
async for item in result:
if not isinstance(item, (File, FileItem)) or not spider.validate_json or isinstance(item.data, dict):
yield item
continue
read_data_from_file_if_any(item)
try:
json.loads(item.data)
yield item
except json.JSONDecodeError:
spider.crawler.stats.inc_value('invalid_json_count')
# https://github.com/scrapy/scrapy/blob/48c5a8c/scrapy/core/scraper.py#L364-L367
logkws = spider.crawler.logformatter.dropped(item, DropItem('Invalid JSON'), response, spider)
if logkws is not None:
spider.logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
[docs]
class RootPathMiddleware:
"""
If the spider's ``root_path`` class attribute is non-empty, replaces the item's ``data`` with the objects at that
prefix; if there are multiple releases, records or packages at that prefix, combines them into packages in groups
of 100, and updates the item's ``data_type`` if needed. Otherwise, yields the original item.
"""
[docs]
async def process_spider_output(self, response, result, spider):
"""
:returns: a generator of File or FileItem objects, in which the ``data`` field is parsed JSON
"""
async for item in result:
if not isinstance(item, (File, FileItem)) or not spider.root_path:
yield item
continue
data = item.data
# Re-encode the data, to traverse the JSON using only ijson, instead of either ijson or Python.
# This is only expected to occur when both `root_path` and `concatenated_json` are set.
if isinstance(data, dict):
data = util.json_dumps(data).encode()
iterable = util.transcode(spider, ijson.items, data, spider.root_path)
if 'item' in spider.root_path.split('.'):
# Two common issues in OCDS data are:
#
# - Multiple releases or records, without a package
# - Multiple packages in a single file (often with a single release, record or OCID per package)
#
# Yielding each release, record or package creates a lot of overhead in terms of the number of files
# written, the number of messages in RabbitMQ and the number of rows in PostgreSQL.
#
# We re-package in groups of 100 to reduce the overhead.
is_package = 'package' in item.data_type
if 'release' in item.data_type:
key = 'releases'
item.data_type = 'release_package'
else:
key = 'records'
item.data_type = 'record_package'
for number, items in enumerate(util.grouper(iterable, group_size(spider)), 1):
if sample_filled(spider, number):
return
# Omit the None values returned by `grouper(*, fillvalue=None)`.
items = filter(None, items)
if is_package:
# Assume that the `extensions` are the same for all packages.
package = next(items)
releases_or_records = package[key]
for other in items:
try:
releases_or_records.extend(other[key])
except KeyError as e:
spider.logger.warning('%(key)s not set in %(data)r', {'key': e, 'data': other})
else:
package = {'version': spider.ocds_version, key: list(items)}
yield spider.build_file_item(number, package, item)
else:
# Iterates at most once.
for number, obj in enumerate(iterable, 1):
if sample_filled(spider, number):
return
item.data = obj
yield item
[docs]
class AddPackageMiddleware:
"""
If the spider's ``data_type`` class attribute is "release" or "record", wraps the item's ``data`` in an appropriate
package, and updates the item's ``data_type``. Otherwise, yields the original item.
"""
[docs]
async def process_spider_output(self, response, result, spider):
"""
:returns: a generator of File or FileItem objects, in which the ``data`` field is parsed JSON
"""
async for item in result:
if not isinstance(item, (File, FileItem)) or item.data_type not in ('release', 'record'):
yield item
continue
read_data_from_file_if_any(item)
data = item.data
# If the spider's ``root_path`` class attribute is non-empty, then the JSON data is already parsed.
if isinstance(data, bytes):
data = json.loads(data)
if item.data_type == 'release':
key = 'releases'
else:
key = 'records'
item.data = {'version': spider.ocds_version, key: [data]}
item.data_type += '_package'
yield item
[docs]
class ResizePackageMiddleware:
"""
If the spider's ``resize_package`` class attribute is ``True``, splits the package into packages of 100 releases or
records each. Otherwise, yields the original item.
"""
[docs]
async def process_spider_output(self, response, result, spider):
"""
The spider must yield items whose ``data`` field has ``package`` and ``data`` keys.
:returns: a generator of FileItem objects, in which the ``data`` field is a string
"""
async for item in result:
if not isinstance(item, File) or not getattr(spider, 'resize_package', False):
yield item
continue
data = item.data
if item.data_type == 'release_package':
key = 'releases'
else:
key = 'records'
template = self._get_package_metadata(spider, data['package'], key, item.data_type)
iterable = util.transcode(spider, ijson.items, data['data'], f'{key}.item')
for number, items in enumerate(util.grouper(iterable, group_size(spider)), 1):
if sample_filled(spider, number):
return
package = copy.deepcopy(template)
# Omit the None values returned by `grouper(*, fillvalue=None)`.
package[key] = list(filter(None, items))
yield spider.build_file_item(number, package, item)
def _get_package_metadata(self, spider, data, skip_key, data_type):
"""
Returns the package metadata from a file object.
:param data: a data object
:param str skip_key: the key to skip
:returns: the package metadata
:rtype: dict
"""
package = {}
if 'package' in data_type:
for item in util.items(util.transcode(spider, ijson.parse, data), '', skip_key=skip_key):
package.update(item)
return package
[docs]
class ReadDataMiddleware:
"""
If the item's ``data`` is a file descriptor, replaces the item's ``data`` with the file's contents and closes the
file descriptor. Otherwise, yields the original item.
.. seealso::
:class:`~kingfisher_scrapy.base_spiders.compressed_file_spider.CompressedFileSpider`
"""
[docs]
async def process_spider_output(self, response, result, spider):
"""
:returns: a generator of File objects, in which the ``data`` field is bytes
"""
async for item in result:
if not isinstance(item, File) or not hasattr(item.data, 'read'):
yield item
continue
read_data_from_file_if_any(item)
yield item
[docs]
class RetryDataErrorMiddleware:
"""
Retries a request up to 3 times. Either when the spider raises a ``BadZipFile`` exception, on the assumption that
the response was truncated, or when the spider raises a ``RetryableError`` exception.
"""
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html#scrapy.spidermiddlewares.SpiderMiddleware.process_spider_exception
[docs]
def process_spider_exception(self, response, exception, spider):
if isinstance(exception, (BadZipFile, RetryableError)):
attempts = response.request.meta.get('retries', 0) + 1
if attempts > 3:
spider.logger.error('Gave up retrying %(request)s (failed %(failures)d times): %(exception)s',
{'request': response.request, 'failures': attempts, 'exception': exception})
return
request = response.request.copy()
request.dont_filter = True
request.meta['retries'] = attempts
spider.logger.debug('Retrying %(request)s (failed %(failures)d times): %(exception)s',
{'request': response.request, 'failures': attempts, 'exception': exception})
yield request
else:
raise exception