import json
from datetime import datetime, timedelta
from math import ceil
import scrapy
from kingfisher_scrapy.base_spiders import BaseSpider
from kingfisher_scrapy.exceptions import AccessTokenError, MissingEnvVarError
from kingfisher_scrapy.util import parameters
HEADERS = {'Accept': '*/*', 'Content-Type': 'application/json'}
[docs]
class Openopps(BaseSpider):
"""
Domain
OpenOpps
Spider arguments
from_date
Download only data from this date onward (YYYY-MM-DD format).
If ``until_date`` is provided, defaults to '2011-01-01'.
until_date
Download only data until this date (YYYY-MM-DD format).
If ``from_date`` is provided, defaults to today.
Environment variables
KINGFISHER_OPENOPPS_USERNAME
To get an API account, contact contact@openopps.com.
KINGFISHER_OPENOPPS_PASSWORD
Your API account password.
API documentation
https://docs.google.com/document/d/1u0da3BTU7fBFjX6i7j_tKXa1YwdXL7hY4Kw9GdsaAr0/edit
Swagger API documentation
https://api.openopps.com/api/schema/
"""
name = 'openopps'
download_delay = 1
custom_settings = {
'DOWNLOADER_MIDDLEWARES': {
'kingfisher_scrapy.downloadermiddlewares.OpenOppsAuthMiddleware': 543,
},
}
# BaseSpider
ocds_version = '1.0'
default_from_date = '2011-01-01'
root_path = 'results.item.json'
dont_truncate = True
# Local
access_token = None
api_limit = 10000 # OpenOpps API limit for search results
request_time_limit = 60 # in minutes
reauthenticating = False # flag for request a new token
start_time = None
data_type = 'release_package'
url_pattern = 'https://api.openopps.com/api/ocds/?format=json&ordering=releasedate&page_size=1000&' \
'releasedate__gte={releasedate__gte}&releasedate__lte={releasedate__lte}'
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super(Openopps, cls).from_crawler(crawler, *args, **kwargs)
spider.username = crawler.settings.get('KINGFISHER_OPENOPPS_USERNAME')
spider.password = crawler.settings.get('KINGFISHER_OPENOPPS_PASSWORD')
if spider.username is None or spider.password is None:
raise MissingEnvVarError('KINGFISHER_OPENOPPS_USERNAME and/or KINGFISHER_OPENOPPS_PASSWORD is not set.')
return spider
def start_requests(self):
yield self.build_access_token_request(initial_authentication=True)
def build_access_token_request(self, initial_authentication, **kwargs):
return scrapy.Request(
'https://api.openopps.com/api/api-token-auth/',
method='POST',
headers=HEADERS,
body=json.dumps({'username': self.username, 'password': self.password}),
# Send duplicate requests when we re-authenticate before the token expires
dont_filter=True,
# Flag request access token for middleware and initial authentication for callback function
meta={'token_request': True, 'initial_authentication': initial_authentication},
callback=self.parse_access_token,
**kwargs
)
def parse_access_token(self, response):
if self.is_http_success(response):
token = response.json().get('token')
if token:
self.logger.info('New access token: %s', token)
self.access_token = f'JWT {token}'
self.start_time = datetime.now()
# If the request is initial authentication, start requests
if response.request.meta.get('initial_authentication'):
return self.start_requests_pages()
# For reauthenticating request, set to False and continue
self.reauthenticating = False
else:
self.logger.error('Authentication failed. Status code: %s. %s', response.status, response.text)
raise AccessTokenError()
else:
self.logger.error('Authentication failed. Status code: %s. %s', response.status, response.text)
raise AccessTokenError()
def start_requests_pages(self):
search_h = 24 # start splitting one day search
# Case if we have date range parameters
if self.from_date and self.until_date:
yield from self.request_range_per_day(self.from_date, self.until_date, search_h)
else:
# Use larger ranges for filters with less than (api_limit) search results
release_date_gte_list = ['1970-01-01', '2009-01-01', '2010-01-01', '2010-07-01']
release_date_lte_list = ['2008-12-31', '2009-12-31', '2010-06-30', '2010-12-31']
for i in range(len(release_date_gte_list)):
yield self.request_range(release_date_gte_list[i], release_date_lte_list[i], search_h)
# Use smaller ranges (day by day) for filters with more than (api_limit) search results
for year in range(2011, datetime.now().year + 1):
start_date = datetime(year, 1, 1)
end_date = datetime(year, datetime.now().month, datetime.now().day) \
if year == datetime.now().year else datetime(year, 12, 31)
yield from self.request_range_per_day(start_date, end_date, search_h)
def request_range(self, start_date, end_date, search_h):
return self.build_request(
self.url_pattern.format(releasedate__gte=start_date, releasedate__lte=end_date),
formatter=parameters('releasedate__gte', 'releasedate__lte'),
meta={
'release_date': start_date,
'search_h': search_h,
},
headers=HEADERS
)
def request_range_per_day(self, start_date, end_date, search_h):
date_list = [(start_date + timedelta(days=d)).strftime('%Y-%m-%d')
for d in range((end_date - start_date).days + 1)]
for date in date_list:
yield self.request_range(date, date, search_h)
def parse(self, response):
if self.is_http_success(response):
data = response.json()
count = data['count']
release_date = response.request.meta['release_date'] # date used for the search
search_h = response.request.meta['search_h'] # hour range used for the search
# Counts response and range hour split control
if count <= self.api_limit or search_h == 1:
yield self.build_file_from_response(response, data_type=self.data_type)
next_url = data.get('next')
if next_url:
yield self.build_request(
next_url,
formatter=parameters('releasedate__gte', 'releasedate__lte', 'page'),
meta={
'release_date': release_date,
'search_h': search_h,
},
headers=HEADERS
)
# Tells if we have to re-authenticate before the token expires
time_diff = datetime.now() - self.start_time
if not self.reauthenticating and time_diff.total_seconds() > self.request_time_limit * 60:
self.logger.info('Time_diff: %s', time_diff.total_seconds())
self.reauthenticating = True
yield self.build_access_token_request(initial_authentication=False, priority=1000)
else:
# Change search filter if count exceeds the API limit or search_h > 1 hour
parts = int(ceil(count / self.api_limit)) # parts we split a search that exceeds the limit
split_h = int(ceil(search_h / parts)) # hours we split
# If we have last_hour variable here, we have to split hours
last_hour = response.request.meta.get('last_hour')
if last_hour:
date = datetime.strptime(release_date, '%Y-%m-%dT%H:%M:%S') # release_date with start hour
else:
date = datetime.strptime(release_date, '%Y-%m-%d') # else we have to split a day by day range
last_hour = f'{date.strftime("%Y-%m-%d")}T23:59:59' # last hour of a day
# Create time lists depending on how many hours we split a search
start_hours = [
(date + timedelta(hours=h)).strftime('%Y-%m-%dT%H:%M:%S')
for h in range(0, search_h, split_h)
]
end_hours = [
(date + timedelta(hours=h, minutes=59, seconds=59)).strftime('%Y-%m-%dT%H:%M:%S')
for h in range(split_h - 1, search_h, split_h)
]
# If parts is not a divisor of hours we split, append the last missing hour
if len(start_hours) != len(end_hours):
end_hours.append(last_hour)
self.logger.info('Changing filters, split in %s: %s.', parts, response.request.url)
for i in range(len(start_hours)):
yield self.build_request(
self.url_pattern.format(releasedate__gte=start_hours[i], releasedate__lte=end_hours[i]),
formatter=parameters('releasedate__gte', 'releasedate__lte'),
meta={
'release_date': start_hours[i], # release_date with star hour
'last_hour': end_hours[i], # release_date with last hour
'search_h': split_h, # new search range
},
headers=HEADERS
)
else:
# Message for pages that exceed the 10,000 search results in the range of one hour
# These are pages with status 500 and 'page=11' in the URL request
if response.status == 500 and response.request.url.count('page=11'):
self.logger.error('Status: %s. Results exceeded in a range of one hour, we save the first 10,000 data '
'for: %s', response.status, response.request.url)
else:
yield self.build_file_error_from_response(response)