Source code for kingfisher_scrapy.extensions.kingfisher_process_api2

import asyncio
import functools
from urllib.parse import urljoin

import orjson
import requests
from scrapy import signals
from scrapy.exceptions import NotConfigured
from yapw.clients import Async

from kingfisher_scrapy.items import PluckedItem


[docs] class KingfisherProcessAPI2: """ If the ``KINGFISHER_API2_URL``, ``RABBIT_URL``, ``RABBIT_EXCHANGE_NAME`` and ``RABBIT_ROUTING_KEY`` environment variables or configuration settings are set, then OCDS data is stored in Kingfisher Process, incrementally. When the spider is opened, a collection is created in Kingfisher Process via its web API. The API also receives the ``note`` and ``steps`` spider arguments (if set) and the spider's ``ocds_version`` class attribute. When an item is scraped, a message is published to the exchange for Kingfisher Process in RabbitMQ, with the path to the file written by the :class:`~kingfisher_scrapy.extensions.files_store.FilesStore` extension. When the spider is closed, the collection is closed in Kingfisher Process via its web API, unless the ``keep_collection_open`` spider argument was set to ``'true'``. The API also receives the crawl statistics and the reason why the spider was closed. .. note:: If the ``DATABASE_URL`` environment variable or configuration setting is set, this extension is disabled and the :class:`~kingfisher_scrapy.extensions.database_store.DatabaseStore` extension is enabled. .. note:: This extension ignores items generated by the :ref:`pluck` command. """ def __init__(self, url, stats, rabbit_url, rabbit_exchange_name, rabbit_routing_key): self.url = url self.stats = stats self.rabbit_url = rabbit_url self.rabbit_exchange_name = rabbit_exchange_name self.routing_key = rabbit_routing_key # The client and collection ID are set by the spider_opened handler. self.client = None self.collection_id = None
[docs] @classmethod def from_crawler(cls, crawler): url = crawler.settings["KINGFISHER_API2_URL"] rabbit_url = crawler.settings["RABBIT_URL"] rabbit_exchange_name = crawler.settings["RABBIT_EXCHANGE_NAME"] rabbit_routing_key = crawler.settings["RABBIT_ROUTING_KEY"] if crawler.settings["DATABASE_URL"]: raise NotConfigured("DATABASE_URL is set.") if not url: raise NotConfigured("KINGFISHER_API2_URL is not set.") if not rabbit_url: raise NotConfigured("RABBIT_URL is not set.") if not rabbit_exchange_name: raise NotConfigured("RABBIT_EXCHANGE_NAME is not set.") if not rabbit_routing_key: raise NotConfigured("RABBIT_ROUTING_KEY is not set.") extension = cls(url, crawler.stats, rabbit_url, rabbit_exchange_name, rabbit_routing_key) crawler.signals.connect(extension.spider_opened, signal=signals.spider_opened) crawler.signals.connect(extension.item_scraped, signal=signals.item_scraped) crawler.signals.connect(extension.spider_closed, signal=signals.spider_closed) return extension
[docs] def spider_opened(self, spider): """Send an API request to create a collection in Kingfisher Process.""" data_version = spider.get_start_time("%Y-%m-%d %H:%M:%S") data = { "source_id": spider.name, "data_version": data_version, "sample": bool(spider.sample), "upgrade": spider.ocds_version == "1.0", } if spider.kingfisher_process_note: data["note"] = spider.kingfisher_process_note if hasattr(spider, "_job"): data["job"] = spider._job for step in spider.kingfisher_process_steps: data[step] = True # This request must be synchronous, to have the collection ID for the item_scraped handler. response = self._post_synchronous(spider, "/api/collections/", data) if response.ok: # https://docs.scrapy.org/en/latest/topics/asyncio.html#handling-a-pre-installed-reactor from twisted.internet import reactor # noqa: PLC0415 self.collection_id = response.json()["collection_id"] # WARNING! If this log message is changed, update the regular expression in the data_registry/ # process_manager/task/collect.py file in the open-contracting/data-registry repository to match. spider.logger.info("Created collection %d in Kingfisher Process (%s)", self.collection_id, data_version) self.client = Async( url=self.rabbit_url, exchange=self.rabbit_exchange_name, routing_key_template="{routing_key}", # When running `scrapy crawl`, the event loop isn't running when __init__ is called. custom_ioloop=asyncio.get_running_loop(), manage_ioloop=False, ) # Connect to RabbitMQ only if a collection_id is set, as other signals don't use RabbitMQ, otherwise. self.client.start() # Ensure the RabbitMQ connection is closed during reactor shutdown. self.shutdown_trigger_id = reactor.addSystemEventTrigger("before", "shutdown", self.disconnect) else: self._response_error(spider, "Failed to create collection", response)
[docs] def spider_closed(self, spider, reason): """Send an API request to close the collection in Kingfisher Process.""" if not self.collection_id: return # Scrapyd's cancel.json endpoint sends a SIGINT signal to the Scrapy process, which uses the "shutdown" reason. # If a process is cancelled, don't close the collection, as this triggers compilation of release collections. if spider.pluck or spider.kingfisher_process_keep_collection_open or reason == "shutdown": return response = self._post_synchronous( spider, f"/api/collections/{self.collection_id}/close/", { "reason": reason, "stats": orjson.loads(orjson.dumps(self.stats.get_stats(), default=str)), # for datetime objects }, ) if response.ok: spider.logger.info("Closed collection %d in Kingfisher Process", self.collection_id) else: self._response_error(spider, "Failed to close collection", response)
[docs] def item_scraped(self, item, spider): """Publish a RabbitMQ message to store the file or file item in Kingfisher Process.""" if not self.collection_id: return if isinstance(item, PluckedItem): return data = { "collection_id": self.collection_id, "url": str(item.url), # pydantic.HttpUrl "path": item.path, } cb = functools.partial(self._when_ready, self.client.publish, data, self.routing_key) self.client.connection.ioloop.call_soon_threadsafe(cb) # WARNING! Kingfisher Process's API reads this value. self.stats.inc_value("kingfisher_process_expected_files_count")
[docs] def disconnect(self): """Close the RabbitMQ connection.""" cb = functools.partial(self._when_ready, self.client.interrupt) self.client.connection.ioloop.call_soon_threadsafe(cb)
def _post_synchronous(self, spider, path, data): """POST synchronous API requests to Kingfisher Process.""" url = urljoin(self.url, path) spider.logger.debug("Sending synchronous request to Kingfisher Process at %s with %s", url, data) return requests.post(url, json=data, timeout=3600) # 1h def _when_ready(self, function, *args): # Scrapy can sometimes reach signals before yapw reaches exchange_ready. if self.client.ready: function(*args) else: self.client.connection.ioloop.call_soon(self._when_ready, function, *args) def _response_error(self, spider, message, response): spider.logger.critical( "%s: status=%d response=%r headers=%r", message, response.status_code, response.text, response.headers )