import copy
import logging
from zipfile import BadZipFile
import ijson
import orjson
from scrapy.exceptions import DropItem
from scrapy.spidermiddlewares.httperror import HttpError
from scrapy.utils.asyncio import run_in_thread
from scrapy.utils.log import logformatter_adapter
from kingfisher_scrapy import util
from kingfisher_scrapy.exceptions import RetryableError
from kingfisher_scrapy.items import File, FileItem
MAX_GROUP_SIZE = 100
logger = logging.getLogger(__name__)
# Avoid reading the rest of a large file, since the rest of the items will be dropped.
[docs]
def sample_filled(spider, number):
return spider.sample and number > spider.sample
[docs]
def group_size(spider):
if spider.sample:
return min(spider.sample, MAX_GROUP_SIZE)
return MAX_GROUP_SIZE
[docs]
def read_data_from_file_if_any(item):
if hasattr(item.data, "read"):
content = item.data.read()
item.data.close()
item.data = content
[docs]
class BaseSpiderMiddleware:
"""Base class for spider middlewares that need access to the spider instance."""
def __init__(self, crawler):
self.spider = crawler.spider
self.logformatter = crawler.logformatter
self.stats = crawler.stats
[docs]
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
[docs]
class ConcatenatedJSONMiddleware(BaseSpiderMiddleware):
"""
If the spider's ``concatenated_json`` class attribute is ``True``, yield each object of the File as a FileItem.
Otherwise, yield the original item.
"""
[docs]
async def process_spider_output(self, response, result):
"""Return a generator of FileItem objects, in which the ``data`` field is parsed JSON."""
async for item in result:
if not self.spider.concatenated_json or not isinstance(item, File):
yield item
continue
data = item.data
# ijson can read from bytes or a file-like object.
for number, obj in enumerate(util.transcode(self.spider, ijson.items, data, "", multiple_values=True), 1):
if sample_filled(self.spider, number):
return
yield self.spider.build_file_item(number, obj, item)
[docs]
class LineDelimitedMiddleware(BaseSpiderMiddleware):
"""
If the spider's ``line_delimited`` class attribute is ``True``, yield each line of the File as a FileItem.
Otherwise, yield the original item.
"""
[docs]
async def process_spider_output(self, response, result):
"""Return a generator of FileItem objects, in which the ``data`` field is bytes."""
async for item in result:
if not self.spider.line_delimited or not isinstance(item, File):
yield item
continue
data = item.data
# Data can be bytes or a file-like object. If bytes, split into an iterable.
if isinstance(data, bytes):
data = data.splitlines(keepends=True)
for number, line in enumerate(data, 1):
if sample_filled(self.spider, number):
return
yield self.spider.build_file_item(number, line, item)
[docs]
class ValidateJSONMiddleware(BaseSpiderMiddleware):
"""
If the spider's ``validate_json`` class attribute is ``True``, check if the item's ``data`` field is valid
JSON. If not, yield nothing. Otherwise, yield the original item.
"""
[docs]
async def process_spider_output(self, response, result):
"""Return a generator of File or FileItem objects, in which the ``data`` field is valid JSON."""
async for item in result:
if not self.spider.validate_json or not isinstance(item, File | FileItem) or isinstance(item.data, dict):
yield item
continue
read_data_from_file_if_any(item)
try:
orjson.loads(item.data)
yield item
except orjson.JSONDecodeError:
self.stats.inc_value("invalid_json_count")
# https://github.com/scrapy/scrapy/blob/49930df/scrapy/core/scraper.py#L504-L508
logkws = self.logformatter.dropped(item, DropItem("Invalid JSON"), response, self.spider)
if logkws is not None:
logger.log(*logformatter_adapter(logkws), extra={"spider": self.spider})
[docs]
class RootPathMiddleware(BaseSpiderMiddleware):
"""
If the spider's ``root_path`` class attribute is non-empty, replace the item's ``data`` with the objects at that
prefix; if there are multiple releases, records or packages at that prefix, combine them into packages in groups
of 100, and update the item's ``data_type`` if needed. Otherwise, yield the original item.
"""
[docs]
async def process_spider_output(self, response, result):
"""Return a generator of File or FileItem objects, in which the ``data`` field is parsed JSON."""
async for item in result:
if not self.spider.root_path or not isinstance(item, File | FileItem):
yield item
continue
data = item.data
# Re-encode the data, to traverse the JSON using only ijson, instead of either ijson or Python.
# This is only expected to occur when both `root_path` and `concatenated_json` are set.
if isinstance(data, dict):
data = orjson.dumps(data, default=util.default)
iterable = util.transcode(self.spider, ijson.items, data, self.spider.root_path)
if "item" in self.spider.root_path.split("."):
# Two common issues in OCDS data are:
#
# - Multiple releases or records, without a package
# - Multiple packages in a single file (often with a single release, record or OCID per package)
#
# Yielding each release, record or package creates a lot of overhead in terms of the number of files
# written, the number of messages in RabbitMQ and the number of rows in PostgreSQL.
#
# We re-package in groups of 100 to reduce the overhead.
is_package = "package" in item.data_type
if "release" in item.data_type:
key = "releases"
item.data_type = "release_package"
else:
key = "records"
item.data_type = "record_package"
for number, items in enumerate(util.grouper(iterable, group_size(self.spider)), 1):
if sample_filled(self.spider, number):
return
# Omit the None values returned by `grouper(*, fillvalue=None)`.
items = filter(None, items)
if is_package:
# Assume that the `extensions` are the same for all packages.
package = next(items)
try:
releases_or_records = package[key]
except KeyError as e:
logger.warning("%(key)s not set in %(data)r", {"key": e, "data": package})
for other in items:
try:
releases_or_records.extend(other[key])
except KeyError as e:
logger.warning("%(key)s not set in %(data)r", {"key": e, "data": other})
else:
package = {"version": self.spider.ocds_version, key: list(items)}
yield self.spider.build_file_item(number, package, item)
else:
# Iterates at most once.
for number, obj in enumerate(iterable, 1):
if sample_filled(self.spider, number):
return
item.data = obj
yield item
[docs]
class AddPackageMiddleware(BaseSpiderMiddleware):
"""
If the spider's ``data_type`` class attribute is "release" or "record", wrap the item's ``data`` in an appropriate
package, and update the item's ``data_type``. Otherwise, yield the original item.
"""
[docs]
async def process_spider_output(self, response, result):
"""Return a generator of File or FileItem objects, in which the ``data`` field is parsed JSON."""
async for item in result:
if not isinstance(item, File | FileItem) or item.data_type not in {"release", "record"}:
yield item
continue
read_data_from_file_if_any(item)
data = item.data
# If the spider's `root_path` class attribute is non-empty, then the JSON data is already parsed.
if isinstance(data, bytes):
data = orjson.loads(data)
key = "releases" if item.data_type == "release" else "records"
item.data = {"version": self.spider.ocds_version, key: [data]}
item.data_type += "_package"
yield item
[docs]
class ResizePackageMiddleware(BaseSpiderMiddleware):
"""
If the spider's ``resize_package`` class attribute is ``True``, split the package into packages of 100 releases or
records each. Otherwise, yield the original item.
Optionally, implement an ``ocid_fallback`` method on the spider, which accepts a release (or record) and returns an
an ``ocid`` value, to be used if the ``ocid`` field is not set.
"""
[docs]
async def process_spider_output(self, response, result):
"""
Return a generator of FileItem objects, in which the ``data`` field is a string.
The spider must yield items whose ``data`` field has ``package`` and ``data`` keys.
"""
async for item in result:
if not self.spider.resize_package or not isinstance(item, File):
yield item
continue
data = item.data
key = "releases" if item.data_type == "release_package" else "records"
# Parse JSON in a worker thread so the reactor can dispatch other work (e.g. pika callbacks
# for the kingfisher_process_api2 extension) while big files are split.
template = await run_in_thread(self._get_package_metadata, data["package"], key, item.data_type)
iterable = util.transcode(self.spider, ijson.items, data["data"], f"{key}.item")
grouped = util.grouper(iterable, group_size(self.spider))
number = 0
while True:
items = await run_in_thread(next, grouped, None)
if items is None:
break
number += 1
if sample_filled(self.spider, number):
return
# Kingfisher Process merges only releases and records with OCIDs.
if hasattr(self.spider, "ocid_fallback"):
for entry in items:
if entry and "ocid" not in entry:
entry["ocid"] = self.spider.ocid_fallback(entry)
package = copy.deepcopy(template)
# Omit the None values returned by `grouper(*, fillvalue=None)`.
package[key] = list(filter(None, items))
yield self.spider.build_file_item(number, package, item)
def _get_package_metadata(self, data, skip_key, data_type):
"""
Return the package metadata from a file object.
:param data: a data object
:param str skip_key: the key to skip
:returns: the package metadata
:rtype: dict
"""
package = {}
if "package" in data_type:
for item in util.items(util.transcode(self.spider, ijson.parse, data), "", skip_key=skip_key):
package.update(item)
return package
[docs]
class ReadDataMiddleware(BaseSpiderMiddleware):
"""
If the item's ``data`` is a file descriptor, replace the item's ``data`` with the file's contents and close the
file descriptor. Otherwise, yield the original item.
.. seealso::
:class:`~kingfisher_scrapy.base_spiders.compressed_file_spider.CompressedFileSpider`
"""
[docs]
async def process_spider_output(self, response, result):
"""Return a generator of File objects, in which the ``data`` field is bytes."""
async for item in result:
if not isinstance(item, File) or not hasattr(item.data, "read"):
yield item
continue
read_data_from_file_if_any(item)
yield item
[docs]
class HttpErrorMiddleware(BaseSpiderMiddleware):
"""
Handle HTTP errors raised by Scrapy's HttpErrorMiddleware.
If :meth:`~kingfisher_scrapy.base_spider.BaseSpider.is_http_retryable` returns ``True`` and the number of attempts
is less than the spider's ``max_attempts`` class attribute, retries the request, after waiting the number of
seconds returned by :meth:`~kingfisher_scrapy.base_spider.BaseSpider.get_retry_wait_time`.
Otherwise, logs an error message.
"""
[docs]
def process_spider_exception(self, response, exception):
if not isinstance(exception, HttpError):
return None
attempts = response.request.meta.get("retries", 0) + 1
# Scrapy doesn't honor the Retry-After header. https://github.com/scrapy/scrapy/issues/3849
if (response.status == 429 or self.spider.is_http_retryable(response)) and attempts < self.spider.max_attempts:
wait_time = self.spider.get_retry_wait_time(response)
request = response.request.copy()
request.meta["retries"] = attempts
request.meta["wait_time"] = wait_time
request.dont_filter = True
logger.debug(
"Retrying %(request)s in %(wait_time)ds (failed %(failures)d times): HTTP %(status)d",
{"request": response.request, "failures": attempts, "status": response.status, "wait_time": wait_time},
)
return [request]
if self.spider.is_http_retryable(response):
# Spiders that set `retry_http_codes` aren't expected to also define `is_http_error_expected()`.
self.spider.log_error_from_response(response, message=f"Gave up retrying (failed {attempts} times)")
else:
level = "warning" if self.spider.is_http_error_expected(response) else "error"
self.spider.log_error_from_response(response, level=level)
return None
[docs]
class RetryDataErrorMiddleware:
"""
Retry a request up to 3 times.
Either when the spider raises a ``BadZipFile`` exception, on the assumption that the response was truncated,
or when the spider raises a ``RetryableError`` exception.
"""
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html#scrapy.spidermiddlewares.SpiderMiddleware.process_spider_exception
[docs]
def process_spider_exception(self, response, exception):
if not isinstance(exception, BadZipFile | RetryableError):
return None
attempts = response.request.meta.get("retries", 0) + 1
if attempts > 3:
logger.error(
"Gave up retrying %(request)s (failed %(failures)d times): %(exception)s",
{"request": response.request, "failures": attempts, "exception": exception},
)
return []
request = response.request.copy()
request.dont_filter = True
request.meta["retries"] = attempts
logger.debug(
"Retrying %(request)s (failed %(failures)d times): %(exception)s",
{"request": response.request, "failures": attempts, "exception": exception},
)
return [request]