Source code for kingfisher_scrapy.spidermiddlewares

import copy
import logging
from zipfile import BadZipFile

import ijson
import orjson
from scrapy.exceptions import DropItem
from scrapy.spidermiddlewares.httperror import HttpError
from scrapy.utils.asyncio import run_in_thread
from scrapy.utils.log import logformatter_adapter

from kingfisher_scrapy import util
from kingfisher_scrapy.exceptions import RetryableError
from kingfisher_scrapy.items import File, FileItem

MAX_GROUP_SIZE = 100

logger = logging.getLogger(__name__)


# Avoid reading the rest of a large file, since the rest of the items will be dropped.
[docs] def sample_filled(spider, number): return spider.sample and number > spider.sample
[docs] def group_size(spider): if spider.sample: return min(spider.sample, MAX_GROUP_SIZE) return MAX_GROUP_SIZE
[docs] def read_data_from_file_if_any(item): if hasattr(item.data, "read"): content = item.data.read() item.data.close() item.data = content
[docs] class BaseSpiderMiddleware: """Base class for spider middlewares that need access to the spider instance.""" def __init__(self, crawler): self.spider = crawler.spider self.logformatter = crawler.logformatter self.stats = crawler.stats
[docs] @classmethod def from_crawler(cls, crawler): return cls(crawler)
[docs] class ConcatenatedJSONMiddleware(BaseSpiderMiddleware): """ If the spider's ``concatenated_json`` class attribute is ``True``, yield each object of the File as a FileItem. Otherwise, yield the original item. """
[docs] async def process_spider_output(self, response, result): """Return a generator of FileItem objects, in which the ``data`` field is parsed JSON.""" async for item in result: if not self.spider.concatenated_json or not isinstance(item, File): yield item continue data = item.data # ijson can read from bytes or a file-like object. for number, obj in enumerate(util.transcode(self.spider, ijson.items, data, "", multiple_values=True), 1): if sample_filled(self.spider, number): return yield self.spider.build_file_item(number, obj, item)
[docs] class LineDelimitedMiddleware(BaseSpiderMiddleware): """ If the spider's ``line_delimited`` class attribute is ``True``, yield each line of the File as a FileItem. Otherwise, yield the original item. """
[docs] async def process_spider_output(self, response, result): """Return a generator of FileItem objects, in which the ``data`` field is bytes.""" async for item in result: if not self.spider.line_delimited or not isinstance(item, File): yield item continue data = item.data # Data can be bytes or a file-like object. If bytes, split into an iterable. if isinstance(data, bytes): data = data.splitlines(keepends=True) for number, line in enumerate(data, 1): if sample_filled(self.spider, number): return yield self.spider.build_file_item(number, line, item)
[docs] class ValidateJSONMiddleware(BaseSpiderMiddleware): """ If the spider's ``validate_json`` class attribute is ``True``, check if the item's ``data`` field is valid JSON. If not, yield nothing. Otherwise, yield the original item. """
[docs] async def process_spider_output(self, response, result): """Return a generator of File or FileItem objects, in which the ``data`` field is valid JSON.""" async for item in result: if not self.spider.validate_json or not isinstance(item, File | FileItem) or isinstance(item.data, dict): yield item continue read_data_from_file_if_any(item) try: orjson.loads(item.data) yield item except orjson.JSONDecodeError: self.stats.inc_value("invalid_json_count") # https://github.com/scrapy/scrapy/blob/49930df/scrapy/core/scraper.py#L504-L508 logkws = self.logformatter.dropped(item, DropItem("Invalid JSON"), response, self.spider) if logkws is not None: logger.log(*logformatter_adapter(logkws), extra={"spider": self.spider})
[docs] class RootPathMiddleware(BaseSpiderMiddleware): """ If the spider's ``root_path`` class attribute is non-empty, replace the item's ``data`` with the objects at that prefix; if there are multiple releases, records or packages at that prefix, combine them into packages in groups of 100, and update the item's ``data_type`` if needed. Otherwise, yield the original item. """
[docs] async def process_spider_output(self, response, result): """Return a generator of File or FileItem objects, in which the ``data`` field is parsed JSON.""" async for item in result: if not self.spider.root_path or not isinstance(item, File | FileItem): yield item continue data = item.data # Re-encode the data, to traverse the JSON using only ijson, instead of either ijson or Python. # This is only expected to occur when both `root_path` and `concatenated_json` are set. if isinstance(data, dict): data = orjson.dumps(data, default=util.default) iterable = util.transcode(self.spider, ijson.items, data, self.spider.root_path) if "item" in self.spider.root_path.split("."): # Two common issues in OCDS data are: # # - Multiple releases or records, without a package # - Multiple packages in a single file (often with a single release, record or OCID per package) # # Yielding each release, record or package creates a lot of overhead in terms of the number of files # written, the number of messages in RabbitMQ and the number of rows in PostgreSQL. # # We re-package in groups of 100 to reduce the overhead. is_package = "package" in item.data_type if "release" in item.data_type: key = "releases" item.data_type = "release_package" else: key = "records" item.data_type = "record_package" for number, items in enumerate(util.grouper(iterable, group_size(self.spider)), 1): if sample_filled(self.spider, number): return # Omit the None values returned by `grouper(*, fillvalue=None)`. items = filter(None, items) if is_package: # Assume that the `extensions` are the same for all packages. package = next(items) try: releases_or_records = package[key] except KeyError as e: logger.warning("%(key)s not set in %(data)r", {"key": e, "data": package}) for other in items: try: releases_or_records.extend(other[key]) except KeyError as e: logger.warning("%(key)s not set in %(data)r", {"key": e, "data": other}) else: package = {"version": self.spider.ocds_version, key: list(items)} yield self.spider.build_file_item(number, package, item) else: # Iterates at most once. for number, obj in enumerate(iterable, 1): if sample_filled(self.spider, number): return item.data = obj yield item
[docs] class AddPackageMiddleware(BaseSpiderMiddleware): """ If the spider's ``data_type`` class attribute is "release" or "record", wrap the item's ``data`` in an appropriate package, and update the item's ``data_type``. Otherwise, yield the original item. """
[docs] async def process_spider_output(self, response, result): """Return a generator of File or FileItem objects, in which the ``data`` field is parsed JSON.""" async for item in result: if not isinstance(item, File | FileItem) or item.data_type not in {"release", "record"}: yield item continue read_data_from_file_if_any(item) data = item.data # If the spider's `root_path` class attribute is non-empty, then the JSON data is already parsed. if isinstance(data, bytes): data = orjson.loads(data) key = "releases" if item.data_type == "release" else "records" item.data = {"version": self.spider.ocds_version, key: [data]} item.data_type += "_package" yield item
[docs] class ResizePackageMiddleware(BaseSpiderMiddleware): """ If the spider's ``resize_package`` class attribute is ``True``, split the package into packages of 100 releases or records each. Otherwise, yield the original item. Optionally, implement an ``ocid_fallback`` method on the spider, which accepts a release (or record) and returns an an ``ocid`` value, to be used if the ``ocid`` field is not set. """
[docs] async def process_spider_output(self, response, result): """ Return a generator of FileItem objects, in which the ``data`` field is a string. The spider must yield items whose ``data`` field has ``package`` and ``data`` keys. """ async for item in result: if not self.spider.resize_package or not isinstance(item, File): yield item continue data = item.data key = "releases" if item.data_type == "release_package" else "records" # Parse JSON in a worker thread so the reactor can dispatch other work (e.g. pika callbacks # for the kingfisher_process_api2 extension) while big files are split. template = await run_in_thread(self._get_package_metadata, data["package"], key, item.data_type) iterable = util.transcode(self.spider, ijson.items, data["data"], f"{key}.item") grouped = util.grouper(iterable, group_size(self.spider)) number = 0 while True: items = await run_in_thread(next, grouped, None) if items is None: break number += 1 if sample_filled(self.spider, number): return # Kingfisher Process merges only releases and records with OCIDs. if hasattr(self.spider, "ocid_fallback"): for entry in items: if entry and "ocid" not in entry: entry["ocid"] = self.spider.ocid_fallback(entry) package = copy.deepcopy(template) # Omit the None values returned by `grouper(*, fillvalue=None)`. package[key] = list(filter(None, items)) yield self.spider.build_file_item(number, package, item)
def _get_package_metadata(self, data, skip_key, data_type): """ Return the package metadata from a file object. :param data: a data object :param str skip_key: the key to skip :returns: the package metadata :rtype: dict """ package = {} if "package" in data_type: for item in util.items(util.transcode(self.spider, ijson.parse, data), "", skip_key=skip_key): package.update(item) return package
[docs] class ReadDataMiddleware(BaseSpiderMiddleware): """ If the item's ``data`` is a file descriptor, replace the item's ``data`` with the file's contents and close the file descriptor. Otherwise, yield the original item. .. seealso:: :class:`~kingfisher_scrapy.base_spiders.compressed_file_spider.CompressedFileSpider` """
[docs] async def process_spider_output(self, response, result): """Return a generator of File objects, in which the ``data`` field is bytes.""" async for item in result: if not isinstance(item, File) or not hasattr(item.data, "read"): yield item continue read_data_from_file_if_any(item) yield item
[docs] class HttpErrorMiddleware(BaseSpiderMiddleware): """ Handle HTTP errors raised by Scrapy's HttpErrorMiddleware. If :meth:`~kingfisher_scrapy.base_spider.BaseSpider.is_http_retryable` returns ``True`` and the number of attempts is less than the spider's ``max_attempts`` class attribute, retries the request, after waiting the number of seconds returned by :meth:`~kingfisher_scrapy.base_spider.BaseSpider.get_retry_wait_time`. Otherwise, logs an error message. """
[docs] def process_spider_exception(self, response, exception): if not isinstance(exception, HttpError): return None attempts = response.request.meta.get("retries", 0) + 1 # Scrapy doesn't honor the Retry-After header. https://github.com/scrapy/scrapy/issues/3849 if (response.status == 429 or self.spider.is_http_retryable(response)) and attempts < self.spider.max_attempts: wait_time = self.spider.get_retry_wait_time(response) request = response.request.copy() request.meta["retries"] = attempts request.meta["wait_time"] = wait_time request.dont_filter = True logger.debug( "Retrying %(request)s in %(wait_time)ds (failed %(failures)d times): HTTP %(status)d", {"request": response.request, "failures": attempts, "status": response.status, "wait_time": wait_time}, ) return [request] if self.spider.is_http_retryable(response): # Spiders that set `retry_http_codes` aren't expected to also define `is_http_error_expected()`. self.spider.log_error_from_response(response, message=f"Gave up retrying (failed {attempts} times)") else: level = "warning" if self.spider.is_http_error_expected(response) else "error" self.spider.log_error_from_response(response, level=level) return None
[docs] class RetryDataErrorMiddleware: """ Retry a request up to 3 times. Either when the spider raises a ``BadZipFile`` exception, on the assumption that the response was truncated, or when the spider raises a ``RetryableError`` exception. """ # https://docs.scrapy.org/en/latest/topics/spider-middleware.html#scrapy.spidermiddlewares.SpiderMiddleware.process_spider_exception
[docs] def process_spider_exception(self, response, exception): if not isinstance(exception, BadZipFile | RetryableError): return None attempts = response.request.meta.get("retries", 0) + 1 if attempts > 3: logger.error( "Gave up retrying %(request)s (failed %(failures)d times): %(exception)s", {"request": response.request, "failures": attempts, "exception": exception}, ) return [] request = response.request.copy() request.dont_filter = True request.meta["retries"] = attempts logger.debug( "Retrying %(request)s (failed %(failures)d times): %(exception)s", {"request": response.request, "failures": attempts, "exception": exception}, ) return [request]