Source code for onlinejudge.service.atcoder

# Python Version: 3.x
# -*- coding: utf-8 -*-
"""
the module for AtCoder (https://atcoder.jp/)

:note: There are some useful endpoints:

    -   https://atcoder.jp/contests/abc001/standings/json
    -   https://atcoder.jp/users/chokudai/history/json

:note: There is an unofficial API https://github.com/kenkoooo/AtCoderProblems

:note: Some methods not inherited from classes :py:mod:`onlinejudge.type` may be modified in future, because the specification is not fixed yet.
"""

import datetime
import itertools
import posixpath
import re
import urllib.parse
from typing import *

import bs4

import onlinejudge._implementation.logging as log
import onlinejudge._implementation.testcase_zipper
import onlinejudge._implementation.utils as utils
import onlinejudge.dispatch
import onlinejudge.type
from onlinejudge.type import *


def _list_alert(resp: requests.Response, soup: Optional[bs4.BeautifulSoup] = None, print_: bool = False) -> List[str]:
    if soup is None:
        soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
    msgs = []  # type: List[str]
    for alert in soup.find_all('div', attrs={'role': 'alert'}):
        msg = ' '.join([s.strip() for s in alert.strings if s.strip()])
        if print_:
            log.warning('AtCoder says: %s', msg)
        msgs += [msg]
    return msgs


def _request(*args, **kwargs):
    """
    This is a workaround. AtCoder's servers sometime fail to send "Content-Type" field.
    see https://github.com/kmyk/online-judge-tools/issues/28 and https://github.com/kmyk/online-judge-tools/issues/232
    """
    resp = utils.request(*args, **kwargs)
    log.debug('AtCoder\'s server said "Content-Type: %s"', resp.headers.get('Content-Type', '(not sent)'))
    resp.encoding = 'UTF-8'
    _list_alert(resp, print_=True)
    return resp


[docs]class AtCoderService(onlinejudge.type.Service):
[docs] def login(self, *, get_credentials: onlinejudge.type.CredentialsProvider, session: Optional[requests.Session] = None) -> None: """ :raises LoginError: """ session = session or utils.get_default_session() if self.is_logged_in(session=session): return # get url = 'https://atcoder.jp/login' resp = _request('GET', url, session=session, allow_redirects=False) # parse soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) form = soup.find('form', action='') if not form: raise LoginError('something wrong') # post username, password = get_credentials() form = utils.FormSender(form, url=resp.url) form.set('username', username) form.set('password', password) resp = form.request(session) _list_alert(resp, print_=True) # result if 'login' not in resp.url: log.success('Welcome,') # AtCoder redirects to the top page if success else: log.failure('Username or Password is incorrect.') raise LoginError
[docs] def get_url_of_login_page(self) -> str: return 'https://atcoder.jp/login'
[docs] def is_logged_in(self, *, session: Optional[requests.Session] = None) -> bool: session = session or utils.get_default_session() url = 'https://atcoder.jp/contests/practice/submit' resp = _request('GET', url, session=session, allow_redirects=False) return resp.status_code == 200
[docs] def get_url(self) -> str: return 'https://atcoder.jp/'
[docs] def get_name(self) -> str: return 'AtCoder'
[docs] @classmethod def from_url(cls, url: str) -> Optional['AtCoderService']: """ :param url: example: - https://atcoder.jp/ - http://agc012.contest.atcoder.jp/ """ result = urllib.parse.urlparse(url) if result.scheme in ('', 'http', 'https') \ and (result.netloc in ('atcoder.jp', 'beta.atcoder.jp') or result.netloc.endswith('.contest.atcoder.jp')): return cls() return None
[docs] def iterate_contest_data(self, *, lang: str = 'ja', session: Optional[requests.Session] = None) -> Iterator['AtCoderContestData']: """ :param lang: must be `ja` (default) or `en`. :note: `lang=ja` is required to see some Japanese-local contests. :note: You can use `lang=en` to see the English names of contests. """ assert lang in ('ja', 'en') session = session or utils.get_default_session() last_page = None for page in itertools.count(1): # 1-based if last_page is not None and page > last_page: break # get url = 'https://atcoder.jp/contests/archive?lang={}&page={}'.format(lang, page) resp = _request('GET', url, session=session) timestamp = datetime.datetime.now(datetime.timezone.utc).astimezone() # parse soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) if last_page is None: last_page = int(soup.find('ul', class_='pagination').find_all('li')[-1].text) log.debug('last page: %s', last_page) tbody = soup.find('tbody') for tr in tbody.find_all('tr'): yield AtCoderContestData._from_table_row(tr, lang=lang, response=resp, session=session, timestamp=timestamp)
[docs] def iterate_contests(self, *, lang: str = 'ja', session: Optional[requests.Session] = None) -> Iterator['AtCoderContest']: for data in self.iterate_contest_data(lang=lang, session=session): yield data.contest
[docs] def get_user_history_url(self, user_id: str) -> str: return 'https://atcoder.jp/users/{}/history/json'.format(user_id)
[docs]class AtCoderContestData(ContestData): """ :ivar contest: :py:class:`AtCoderContest` :ivar duration: :py:class:`datetime.timedelta` :ivar lang: :py:class:`str` :ivar name: :py:class:`str` :ivar rated_range: :py:class:`str` :ivar start_time: :py:class:`datetime.datetime` """ # yapf: disable def __init__( self, *, contest: 'AtCoderContest', duration: datetime.timedelta, lang: str, name: str, rated_range: str, response: requests.Response, session: requests.Session, start_time: datetime.datetime, timestamp: datetime.datetime # TODO: in Python 3.5, you cannnot use both "*" and trailing "," ): # yapf: enable self._contest = contest self.duration = duration self.lang = lang self._name = name self.rated_range = rated_range self._response = response self._session = session self.start_time = start_time self._timestamp = timestamp @property def contest(self) -> 'AtCoderContest': return self._contest @property def name(self) -> str: return self._name @property def html(self) -> bytes: return self._response.content @property def response(self) -> requests.Response: return self._response @property def session(self) -> requests.Session: return self._session @property def timestamp(self) -> datetime.datetime: return self._timestamp @classmethod def _parse_start_time(cls, url: str) -> datetime.datetime: # TODO: we need to use an ISO-format parser query = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) assert len(query['iso']) == 1 assert query['p1'] == ['248'] # means JST return datetime.datetime.strptime(query['iso'][0], '%Y%m%dT%H%M').replace(tzinfo=utils.tzinfo_jst) @classmethod def _from_table_row(cls, tr: bs4.Tag, *, lang: str, response: requests.Response, session: requests.Session, timestamp: datetime.datetime) -> 'AtCoderContestData': tds = tr.find_all('td') assert len(tds) == 4 anchors = [tds[0].find('a'), tds[1].find('a')] contest_path = anchors[1]['href'] assert contest_path.startswith('/contests/') contest_id = contest_path[len('/contests/'):] name = anchors[1].text start_time = cls._parse_start_time(anchors[0]['href']) hours, minutes = map(int, tds[2].text.split(':')) duration = datetime.timedelta(hours=hours, minutes=minutes) rated_range = tds[3].text return AtCoderContestData( contest=AtCoderContest(contest_id=contest_id), duration=duration, lang=lang, name=name, rated_range=rated_range, session=session, start_time=start_time, response=response, timestamp=timestamp, )
[docs]class AtCoderContestDetailedData(AtCoderContestData): """ :ivar can_participate: :py:class:`str` :ivar penalty: :py:class:`datetime.timedelta` """ def __init__(self, *, can_participate: str, penalty: datetime.timedelta, **kwargs): super().__init__(**kwargs) self.can_participate = can_participate self.penalty = penalty @classmethod def _from_response(cls, *, contest: 'AtCoderContest', lang: str, session: requests.Session, response: requests.Response, timestamp: datetime.datetime): soup = bs4.BeautifulSoup(response.content.decode(response.encoding), utils.html_parser) name, _, _ = soup.find('title').text.rpartition(' - ') contest_duration = soup.find('small', class_='contest-duration') start_time, end_time = [cls._parse_start_time(a['href']) for a in contest_duration.find_all('a')] duration = end_time - start_time _, _, can_participate = soup.find('span', text=re.compile(r'^(Can Participate|参加対象): ')).text.partition(': ') _, _, rated_range = soup.find('span', text=re.compile(r'^(Rated Range|Rated対象): ')).text.partition(': ') penalty_text = soup.find('span', text=re.compile(r'^(Penalty|ペナルティ): ')).text if lang == 'en' and penalty_text == 'Penalty: None': minutes = 0 elif lang == 'ja' and penalty_text == 'ペナルティ: なし': minutes = 0 else: m = re.match(r'(Penalty|ペナルティ): (\d+)( minutes?|分)', penalty_text) assert m minutes = int(m.group(2)) penalty = datetime.timedelta(minutes=minutes) return AtCoderContestDetailedData( can_participate=can_participate, contest=contest, duration=duration, lang=lang, name=name, penalty=penalty, rated_range=rated_range, response=response, session=session, start_time=start_time, timestamp=timestamp, )
[docs]class AtCoderContest(onlinejudge.type.Contest): """ :ivar contest_id: :py:class:`str` """ def __init__(self, *, contest_id: str): if contest_id.startswith('http'): # an exception should be raised since mypy cannot check this kind of failure raise ValueError('You should use AtCoderContest.from_url(url) instead of AtCoderContest(url)') self.contest_id = contest_id
[docs] def get_url(self, *, type: Optional[str] = None, lang: Optional[str] = None) -> str: if type is None or type == 'beta': url = 'https://atcoder.jp/contests/{}'.format(self.contest_id) elif type == 'old': url = 'http://{}.contest.atcoder.jp/'.format(self.contest_id) else: assert False if lang is not None: url += '?lang={}'.format(lang) return url
[docs] @classmethod def from_url(cls, url: str) -> Optional['AtCoderContest']: """ :param url: example: - https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d - https://atcoder.jp/contests/agc030 """ result = urllib.parse.urlparse(url) # example: https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d if result.scheme in ('', 'http', 'https') and result.hostname.endswith('.contest.atcoder.jp'): contest_id = utils.remove_suffix(result.hostname, '.contest.atcoder.jp') return cls(contest_id=contest_id) # example: https://atcoder.jp/contests/agc030 if result.scheme in ('', 'http', 'https') and result.hostname in ('atcoder.jp', 'beta.atcoder.jp'): m = re.match(r'/contests/([\w\-_]+)/?.*', utils.normpath(result.path)) if m: contest_id = m.group(1) return cls(contest_id=contest_id) return None
[docs] def download_data(self, *, session: Optional[requests.Session] = None, lang: str = 'en') -> AtCoderContestDetailedData: assert lang in ('en', 'ja') session = session or utils.get_default_session() resp = _request('GET', self.get_url(type='beta', lang=lang), session=session) timestamp = datetime.datetime.now(datetime.timezone.utc).astimezone() return AtCoderContestDetailedData._from_response(contest=self, lang=lang, session=session, response=resp, timestamp=timestamp)
[docs] def get_service(self) -> AtCoderService: return AtCoderService()
[docs] def list_problem_data(self, *, session: Optional[requests.Session] = None) -> List['AtCoderProblemData']: # get session = session or utils.get_default_session() url = 'https://atcoder.jp/contests/{}/tasks'.format(self.contest_id) resp = _request('GET', url, session=session) timestamp = datetime.datetime.now(datetime.timezone.utc).astimezone() # parse soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) tbody = soup.find('tbody') return [AtCoderProblemData._from_table_row(tr, session=session, response=resp, timestamp=timestamp) for tr in tbody.find_all('tr')]
# TODO: why does this require "type: ignore"?
[docs] def list_problems(self, *, session: Optional[requests.Session] = None) -> 'List[AtCoderProblem]': # type: ignore return [data.problem for data in self.list_problem_data(session=session)]
# yapf: disable
[docs] def iterate_submission_data_where( self, *, me: bool = False, problem_id: Optional[str] = None, language_id: Optional[LanguageId] = None, status: Optional[str] = None, user_glob: Optional[str] = None, order: Optional[str] = None, desc: bool = False, lang: Optional[str] = None, pages: Optional[Iterator[int]] = None, session: Optional[requests.Session] = None # TODO: in Python 3.5, you cannnot use both "*" and trailing "," ) -> Iterator['AtCoderSubmissionData']: # yapf: enable """ :note: If you use certain combination of options, then the results may not correct when there are new submissions while crawling. :param status: must be one of `AC`, `WA`, `TLE`, `MLE`, `RE`, `CLE`, `OLE`, `IE`, `WJ`, `WR`, or `Judging` :param order: must be one of `created`, `score`, `source_length`, `time_consumption`, or `memory_consumption` :param me: use the `.../submissions/me` page instead of `.../submission` :param user_glob: is used as the value of `f.User` query parameter :param language_id: is used as the value of `f.Language` query parameter :param lang: must be one of `ja`, `en` :param pages: is an iterator to list the page numbers to GET """ assert status in (None, 'AC', 'WA', 'TLE', 'MLE', 'RE', 'CE', 'QLE', 'OLE', 'IE', 'WJ', 'WR', 'Judging') assert order in (None, 'created', 'score', 'source_length', 'time_consumption', 'memory_consumption') if desc: assert order is not None base_url = 'https://atcoder.jp/contests/{}/submissions'.format(self.contest_id) if me: base_url += '/me' params = {} if problem_id is not None: params['f.Task'] = problem_id if language_id is not None: params['f.Language'] = language_id if status is not None: params['f.Status'] = status if user_glob is not None: params['f.User'] = user_glob if order is not None: params['orderBy'] = order if desc: params['desc'] = 'true' # get session = session or utils.get_default_session() for page in pages or itertools.count(1): params_page = ({'page': str(page)} if page >= 2 else {}) url = base_url + '?' + urllib.parse.urlencode({**params, **params_page}) resp = _request('GET', url, session=session) timestamp = datetime.datetime.now(datetime.timezone.utc).astimezone() submissions = list(self._iterate_submission_data_from_response(resp=resp, session=session, timestamp=timestamp)) if not submissions: break yield from submissions
def _iterate_submission_data_from_response(self, *, resp: requests.Response, session: requests.Session, timestamp: datetime.datetime) -> Iterator['AtCoderSubmissionData']: soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) tbodies = soup.find_all('tbody') if len(tbodies) == 0: return # No Submissions assert len(tbodies) == 1 tbody = tbodies[0] for tr in tbody.find_all('tr'): yield AtCoderSubmissionData._from_table_row(tr, response=resp, session=session, timestamp=timestamp)
[docs] def iterate_submissions_where(self, **kwargs) -> Iterator['AtCoderSubmission']: for data in self.iterate_submission_data_where(**kwargs): yield data.submission
[docs] def iterate_submissions(self, *, session: Optional[requests.Session] = None) -> Iterator['AtCoderSubmission']: """ :note: in implementation, use "ORDER BY created DESC" to list all submissions even when there are new submissions """ yield from self.iterate_submissions_where(order='created', desc=False, session=session)
[docs]class AtCoderProblemData(ProblemData): """ :note: :py:class:`AtCoderProblemData` is obtained the list page (e.g. https://atcoder.jp/contests/agc001/tasks ) :ivar alphabet: :py:class:`str` :ivar memory_limit_byte: :py:class:`int` :ivar name: :py:class:`str` :ivar problem: :py:class:`AtCoderProblem` :ivar time_limit_msec: :py:class:`str` """ # yapf: disable def __init__( self, *, alphabet: str, memory_limit_byte: int, name: str, problem: 'AtCoderProblem', response: Optional[requests.Response], session: Optional[requests.Session], time_limit_msec: int, timestamp: Optional[datetime.datetime], html: Optional[bytes] = None # TODO: in Python 3.5, you cannnot use both "*" and trailing "," ): # yapf: enable self.alphabet = alphabet self.memory_limit_byte = memory_limit_byte self._name = name self._problem = problem self._response = response self._session = session self.time_limit_msec = time_limit_msec self._timestamp = timestamp if html is None: assert response is not None self._html = response.content else: self._html = html @property def problem(self) -> 'AtCoderProblem': return self._problem @property def name(self) -> str: return self._name @property def html(self) -> bytes: return self._html @property def response(self) -> Optional[requests.Response]: return self._response @property def session(self) -> Optional[requests.Session]: return self._session @property def timestamp(self) -> Optional[datetime.datetime]: return self._timestamp @classmethod def _from_table_row(cls, tr: bs4.Tag, *, session: requests.Session, response: requests.Response, timestamp: datetime.datetime) -> 'AtCoderProblemData': tds = tr.find_all('td') assert 4 <= len(tds) <= 5 path = tds[1].find('a')['href'] problem = AtCoderProblem.from_url('https://atcoder.jp' + path) assert problem is not None alphabet = tds[0].text name = tds[1].text if tds[2].text.endswith(' msec'): time_limit_msec = int(utils.remove_suffix(tds[2].text, ' msec')) elif tds[2].text.endswith(' sec'): time_limit_msec = int(float(utils.remove_suffix(tds[2].text, ' sec')) * 1000) else: assert False if tds[3].text.endswith(' KB'): memory_limit_byte = int(float(utils.remove_suffix(tds[3].text, ' KB')) * 1000) elif tds[3].text.endswith(' MB'): memory_limit_byte = int(float(utils.remove_suffix(tds[3].text, ' MB')) * 1000 * 1000) # TODO: confirm this is MB truly, not MiB else: assert False if len(tds) == 5: assert tds[4].text.strip() in ('', 'Submit', '提出') return AtCoderProblemData( alphabet=alphabet, memory_limit_byte=memory_limit_byte, name=name, problem=problem, response=response, session=session, time_limit_msec=time_limit_msec, timestamp=timestamp, ) @classmethod def _from_html(cls, html: bytes, *, problem: 'AtCoderProblem', session: Optional[requests.Session] = None, response: Optional[requests.Response] = None, timestamp: Optional[datetime.datetime] = None) -> 'AtCoderProblemData': soup = bs4.BeautifulSoup(html, utils.html_parser) h2 = soup.find('span', class_='h2') alphabet, _, name = h2.text.partition(' - ') time_limit, memory_limit = h2.find_next_sibling('p').text.split(' / ') for time_limit_prefix in ('実行時間制限: ', 'Time Limit: '): if time_limit.startswith(time_limit_prefix): break else: assert False if time_limit.endswith(' msec'): time_limit_msec = int(utils.remove_suffix(utils.remove_prefix(time_limit, time_limit_prefix), ' msec')) elif time_limit.endswith(' sec'): time_limit_msec = int(float(utils.remove_suffix(utils.remove_prefix(time_limit, time_limit_prefix), ' sec')) * 1000) else: assert False for memory_limit_prefix in ('メモリ制限: ', 'Memory Limit: '): if memory_limit.startswith(memory_limit_prefix): break else: assert False if memory_limit.endswith(' KB'): memory_limit_byte = int(float(utils.remove_suffix(utils.remove_prefix(memory_limit, memory_limit_prefix), ' KB')) * 1000) elif memory_limit.endswith(' MB'): memory_limit_byte = int(float(utils.remove_suffix(utils.remove_prefix(memory_limit, memory_limit_prefix), ' MB')) * 1000 * 1000) else: assert False return AtCoderProblemData( alphabet=alphabet, html=html, memory_limit_byte=memory_limit_byte, name=name, problem=problem, response=response, session=session, time_limit_msec=time_limit_msec, timestamp=timestamp, )
[docs]class AtCoderProblemDetailedData(AtCoderProblemData): """ :note: :py:class:`AtCoderProblemDetailedData` is obtained the problem page (e.g. https://atcoder.jp/contests/agc001/tasks/agc001_a ) :ivar available_languages: :py:class:`Optional` [ :py:class:`List` [ :py:class:`Language` ] ] :ivar input_format: :py:class:`Optional` [ :py:class:`str` ] :ivar sample_cases: :py:class:`Optional` [ :py:class:`List` [ :py:class:`TestCase` ] ] :ivar score: :py:class:`Optional` [ :py:class:`float` ] """ # yapf: disable def __init__( self, *, available_languages: Optional[List[Language]], input_format: Optional[str], sample_cases: Optional[List[TestCase]], score: Optional[int], **kwargs # TODO: in Python 3.5, you cannnot use both "*" and trailing "," ): # yapf: enable super().__init__(**kwargs) self.available_languages = available_languages self.input_format = input_format self._sample_cases = sample_cases self.score = score @property def sample_cases(self) -> Optional[List[TestCase]]: return self._sample_cases @classmethod def _get_tag_lang(cls, tag: bs4.Tag) -> Optional[str]: assert isinstance(tag, bs4.Tag) for parent in tag.parents: for s in parent.attrs.get('class') or []: if s.startswith('lang-'): return s return None @classmethod def _find_sample_tags(cls, soup: bs4.BeautifulSoup) -> Iterator[Tuple[bs4.Tag, bs4.Tag]]: for pre in soup.find_all('pre'): log.debug('pre tag: %s', str(pre)) if not pre.string: continue def h3_plus(tag): prv = tag.find_previous_sibling() if prv and prv.name == 'h3' and prv.string: yield (pre, prv) # the first format: h3+pre yield from h3_plus(pre) # the second format: h3+section pre if pre.parent and pre.parent.name == 'section': # ignore tags which are not samples # example: https://atcoder.jp/contests/abc003/tasks/abc003_4 if pre.find_previous_sibling('pre') is None: yield from h3_plus(pre.parent) @classmethod def _parse_sample_cases(cls, soup: bs4.BeautifulSoup) -> List[onlinejudge.type.TestCase]: """ :raises SampleParseError: """ samples = onlinejudge._implementation.testcase_zipper.SampleZipper() lang = None for pre, h3 in cls._find_sample_tags(soup): s = utils.textfile(utils.dos2unix(pre.string.lstrip())) name = h3.string l = cls._get_tag_lang(pre) if lang is None: lang = l elif lang != l: log.debug('skipped due to language: current one is %s, not %s: %s ', lang, l, name) continue samples.add(s.encode(), name) return samples.get() @classmethod def _parse_input_format(cls, soup: bs4.BeautifulSoup) -> Optional[str]: for h3 in soup.find_all('h3', text=re.compile(r'^(入力|Input)$')): if h3.parent.name == 'section': section = h3.parent else: section = h3.find_next_sibling('section') if section is None: section = soup.find(class_='io-style') if section is None: log.warning('<section> tag not found. something wrong') return None pre = section.find('pre') if pre is not None: return pre.decode_contents(formatter=None) return None @classmethod def _parse_available_languages(cls, soup: bs4.BeautifulSoup, problem: 'AtCoderProblem') -> Optional[List[Language]]: form = soup.find('form', action='/contests/{}/submit'.format(problem.contest_id)) if form is None: return None select = form.find('div', id='select-lang').find('select', attrs={'name': 'data.LanguageId'}) # NOTE: AtCoder can vary languages depending on tasks, even in one contest. here, ignores this fact. languages = [] # type: List[Language] for option in select.find_all('option'): languages += [Language(option.attrs['value'], option.string)] return languages @classmethod def _parse_score(cls, soup: bs4.BeautifulSoup) -> Optional[int]: task_statement = soup.find('div', id='task-statement') p = task_statement.find('p') # first if p is not None and p.text.startswith('配点 : '): score = utils.remove_suffix(utils.remove_prefix(p.text, '配点 : '), ' 点') try: return int(score) except ValueError: # some problems have scores like "<p>配点 : \(100\) 点</p>", not "<p>配点 : 100 点</p>" # example: https://atcoder.jp/contests/wupc2019/tasks/wupc2019_a pass return None
[docs] @classmethod def from_html(cls, html: bytes, *, problem: 'AtCoderProblem', session: Optional[requests.Session] = None, response: Optional[requests.Response] = None, timestamp: Optional[datetime.datetime] = None) -> 'AtCoderProblemDetailedData': """ :param html: must be a HTML of the new (beta) version of AtCoder .. versionadded:: 6.2.0 """ soup = bs4.BeautifulSoup(html, utils.html_parser) try: sample_cases = cls._parse_sample_cases(soup) # type: Optional[List[TestCase]] except SampleParseError: sample_cases = None input_format = cls._parse_input_format(soup) available_languages = cls._parse_available_languages(soup, problem=problem) score = cls._parse_score(soup) data = AtCoderProblemData._from_html(html, problem=problem, session=session, response=response, timestamp=timestamp) return AtCoderProblemDetailedData( alphabet=data.alphabet, available_languages=available_languages, html=data.html, input_format=input_format, memory_limit_byte=data.memory_limit_byte, name=data.name, problem=data.problem, response=data.response, sample_cases=sample_cases, score=score, session=data.session, time_limit_msec=data.time_limit_msec, timestamp=data.timestamp, )
[docs]class AtCoderProblem(onlinejudge.type.Problem): """ :ivar contest_id: :py:class:`str` :ivar problem_id: :py:class:`str` :note: AtCoder has problems independently from contests. Therefore the notions `contest_id`, `alphabet`, and `url` don't belong to problems itself. """ def __init__(self, *, contest_id: str, problem_id: str): self.contest_id = contest_id self.problem_id = problem_id # NOTE: AtCoder calls this as "task_screen_name"
[docs] def download_data(self, *, session: Optional[requests.Session] = None) -> AtCoderProblemDetailedData: """ :raises Exception: if no such problem exists """ session = session or utils.get_default_session() resp = _request('GET', self.get_url(type='beta'), raise_for_status=False, session=session) timestamp = datetime.datetime.now(datetime.timezone.utc).astimezone() if _list_alert(resp): log.warning('are you logged in?') resp.raise_for_status() html = resp.content.decode(resp.encoding).encode() # ensure UTF-8 return AtCoderProblemDetailedData.from_html(html, problem=self, session=session, response=resp, timestamp=timestamp)
[docs] def download_sample_cases(self, *, session: Optional[requests.Session] = None) -> List[onlinejudge.type.TestCase]: """ :raises requests.exceptions.HTTPError: if no such problem exists :raises SampleParseError: if parsing failed """ session = session or utils.get_default_session() resp = _request('GET', self.get_url(type='beta'), session=session) soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) return AtCoderProblemDetailedData._parse_sample_cases(soup)
[docs] def get_url(self, *, type: Optional[str] = None, lang: Optional[str] = None) -> str: if type is None or type == 'beta': url = 'https://atcoder.jp/contests/{}/tasks/{}'.format(self.contest_id, self.problem_id) elif type == 'old': url = 'http://{}.contest.atcoder.jp/tasks/{}'.format(self.contest_id, self.problem_id) else: assert False if lang is not None: url += '?lang={}'.format(lang) return url
[docs] def get_service(self) -> AtCoderService: return AtCoderService()
[docs] def get_contest(self) -> AtCoderContest: return AtCoderContest(contest_id=self.contest_id)
[docs] @classmethod def from_url(cls, s: str) -> Optional['AtCoderProblem']: # example: http://agc012.contest.atcoder.jp/tasks/agc012_d result = urllib.parse.urlparse(s) dirname, basename = posixpath.split(utils.normpath(result.path)) if result.scheme in ('', 'http', 'https') \ and result.netloc.count('.') == 3 \ and result.netloc.endswith('.contest.atcoder.jp') \ and result.netloc.split('.')[0] \ and dirname == '/tasks' \ and basename: contest_id = result.netloc.split('.')[0] problem_id = basename return cls(contest_id=contest_id, problem_id=problem_id) # example: https://beta.atcoder.jp/contests/abc073/tasks/abc073_a m = re.match(r'^/contests/([\w\-_]+)/tasks/([\w\-_]+)$', utils.normpath(result.path)) if result.scheme in ('', 'http', 'https') \ and result.netloc in ('atcoder.jp', 'beta.atcoder.jp') \ and m: contest_id = m.group(1) problem_id = m.group(2) return cls(contest_id=contest_id, problem_id=problem_id) return None
[docs] def download_input_format(self, *, session: Optional[requests.Session] = None) -> Optional[str]: """ :raises Exception: if no such problem exists """ return self.download_data(session=session).input_format
[docs] def get_available_languages(self, *, session: Optional[requests.Session] = None) -> List[Language]: """ :raises NotLoggedInError: """ data = self.download_data(session=session) if data.available_languages is None: log.error('not logged in') raise NotLoggedInError return data.available_languages
[docs] def submit_code(self, code: bytes, language_id: LanguageId, *, filename: Optional[str] = None, session: Optional[requests.Session] = None) -> 'AtCoderSubmission': """ :raises NotLoggedInError: :raises SubmissionError: """ session = session or utils.get_default_session() assert language_id in [language.id for language in self.get_available_languages(session=session)] # get url = 'https://atcoder.jp/contests/{}/submit'.format(self.contest_id) resp = _request('GET', url, session=session) # check whether logged in if 'login' in resp.url: raise NotLoggedInError # parse soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) form = soup.find('form', action='/contests/{}/submit'.format(self.contest_id)) if not form: raise SubmissionError('something wrong') log.debug('form: %s', str(form)) # post form = utils.FormSender(form, url=resp.url) form.set('data.TaskScreenName', self.problem_id) form.set('data.LanguageId', str(language_id)) form.set('sourceCode', code) resp = form.request(session=session) timestamp = datetime.datetime.now(datetime.timezone.utc).astimezone() _list_alert(resp, print_=True) # result if '/submissions/me' in resp.url: submission = next(AtCoderContest(contest_id=self.contest_id)._iterate_submission_data_from_response(resp=resp, session=session, timestamp=timestamp)).submission log.success('success: result: %s', submission.get_url()) return submission else: raise SubmissionError('it may be a rate limit')
[docs] def get_name(self, *, session: Optional[requests.Session] = None) -> str: return self.download_data(session=session).name
[docs] def iterate_submissions(self, *, session: Optional[requests.Session] = None) -> Iterator['AtCoderSubmission']: """ :note: in implementation, use "ORDER BY created DESC" to list all submissions even when there are new submissions """ yield from self.get_contest().iterate_submissions_where(problem_id=self.problem_id, order='created', desc=False, session=session)
[docs] def iterate_submissions_where(self, **kwargs) -> Iterator['AtCoderSubmission']: yield from self.get_contest().iterate_submissions_where(problem_id=self.problem_id, **kwargs)
[docs]class AtCoderSubmissionData(SubmissionData): """ :ivar alphabet: :py:class:`str` :ivar memory_limit_byte: :py:class:`int` :ivar name: :py:class:`str` :ivar problem: :py:class:`AtCoderProblem` :ivar time_limit_msec: :py:class:`str` """ # yapf: disable def __init__( self, *, code_size: int, exec_time_msec: Optional[int], language_name: str, memory_byte: Optional[int], problem: AtCoderProblem, problem_id: str, response: requests.Response, score: float, session: requests.Session, status: str, submission: 'AtCoderSubmission', submission_time: datetime.datetime, timestamp: datetime.datetime, user_id: str # TODO: in Python 3.5, you cannnot use both "*" and trailing "," ): # yapf: enable self.code_size = code_size self.exec_time_msec = exec_time_msec self.language_name = language_name self.memory_byte = memory_byte self._problem = problem self.problem_id = problem_id self._response = response self.score = score self._session = session self._status = status self._submission = submission self.submission_time = submission_time self._timestamp = timestamp self.user_id = user_id @property def status(self) -> str: return self._status @property def submission(self) -> 'AtCoderSubmission': return self._submission @property def problem(self) -> AtCoderProblem: return AtCoderProblem(problem_id=self.problem_id, contest_id=self.submission.contest_id) @property def response(self) -> Optional[requests.Response]: return self._response @property def session(self) -> Optional[requests.Session]: return self._session @property def timestamp(self) -> Optional[datetime.datetime]: return self._timestamp @classmethod def _from_table_row(cls, tr: bs4.Tag, *, session: requests.Session, response: requests.Response, timestamp: datetime.datetime) -> 'AtCoderSubmissionData': tds = tr.find_all('td') assert len(tds) in (8, 10) submission = AtCoderSubmission.from_url('https://atcoder.jp' + tds[-1].find('a')['href']) problem = AtCoderProblem.from_url('https://atcoder.jp' + tds[1].find('a')['href']) assert submission is not None assert problem is not None submission_time = datetime.datetime.strptime(tds[0].text, '%Y-%m-%d %H:%M:%S+0900').replace(tzinfo=utils.tzinfo_jst) problem_id = problem.problem_id user_id = tds[2].find_all('a')[0]['href'].split('/')[-1] language_name = tds[3].text score = float(tds[4].text) code_size = int(utils.remove_suffix(tds[5].text, ' Byte')) status = tds[6].text if len(tds) == 10: exec_time_msec = int(utils.remove_suffix(tds[7].text, ' ms')) # type: Optional[int] memory_byte = int(utils.remove_suffix(tds[8].text, ' KB')) * 1000 # type: Optional[int] else: exec_time_msec = None memory_byte = None return AtCoderSubmissionData( code_size=code_size, exec_time_msec=exec_time_msec, language_name=language_name, memory_byte=memory_byte, problem_id=problem_id, problem=problem, response=response, score=score, session=session, status=status, submission=submission, submission_time=submission_time, timestamp=timestamp, user_id=user_id, )
[docs]class AtCoderSubmissionDetailedData(AtCoderSubmissionData): # yapf: disable def __init__( self, *, source_code: bytes, compile_error: Optional[str], test_sets: Optional[List['AtCoderSubmissionTestSet']], test_cases: Optional[List['AtCoderSubmissionTestCaseResult']], **kwargs # TODO: in Python 3.5, you cannnot use both "*" and trailing "," ): # yapf: enable super().__init__(**kwargs) self._source_code = source_code self.compile_error = compile_error self.test_sets = test_sets self.test_cases = test_cases @property def source_code(self) -> bytes: return self._source_code
[docs]class AtCoderSubmission(onlinejudge.type.Submission): """ :ivar contest_id: :py:class:`str` :ivar submission_id: :py:class:`str` """ def __init__(self, *, contest_id: str, submission_id: int): self.contest_id = contest_id self.submission_id = submission_id
[docs] @classmethod def from_url(cls, s: str) -> Optional['AtCoderSubmission']: submission_id = None # type: Optional[int] # example: http://agc001.contest.atcoder.jp/submissions/1246803 result = urllib.parse.urlparse(s) dirname, basename = posixpath.split(utils.normpath(result.path)) if result.scheme in ('', 'http', 'https') \ and result.netloc.count('.') == 3 \ and result.netloc.endswith('.contest.atcoder.jp') \ and result.netloc.split('.')[0] \ and dirname == '/submissions': contest_id = result.netloc.split('.')[0] try: submission_id = int(basename) except ValueError: pass submission_id = None if submission_id is not None: return cls(contest_id=contest_id, submission_id=submission_id) # example: https://beta.atcoder.jp/contests/abc073/submissions/1592381 m = re.match(r'^/contests/([\w\-_]+)/submissions/(\d+)$', utils.normpath(result.path)) if result.scheme in ('', 'http', 'https') \ and result.netloc in ('atcoder.jp', 'beta.atcoder.jp') \ and m: contest_id = m.group(1) try: submission_id = int(m.group(2)) except ValueError: submission_id = None if submission_id is not None: return cls(contest_id=contest_id, submission_id=submission_id) return None
[docs] def get_url(self, *, type: Optional[str] = None, lang: Optional[str] = None) -> str: if type is None or type == 'beta': url = 'https://atcoder.jp/contests/{}/submissions/{}'.format(self.contest_id, self.submission_id) elif type == 'old': url = 'https://{}.contest.atcoder.jp/submissions/{}'.format(self.contest_id, self.submission_id) else: assert False if lang is not None: url += '?lang={}'.format(lang) return url
[docs] def get_service(self) -> AtCoderService: return AtCoderService()
[docs] def download_problem(self, *, session: Optional[requests.Session] = None) -> AtCoderProblem: problem_id = self.download_data(session=session).problem_id return AtCoderProblem(contest_id=self.contest_id, problem_id=problem_id)
[docs] def get_problem(self) -> AtCoderProblem: """ :raises Exception: :note: There is no way to reconstruct problem_id without networking """ raise Exception
[docs] def download_data(self, *, session: Optional[requests.Session] = None) -> AtCoderSubmissionDetailedData: """ :note: `Exec Time` is undefined when the status is `RE` or `TLE` :note: `Memory` is undefined when the status is `RE` or `TLE` """ session = session or utils.get_default_session() resp = _request('GET', self.get_url(type='beta', lang='en'), session=session) soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) timestamp = datetime.datetime.now(datetime.timezone.utc).astimezone() # Submission #N id_, = soup.find_all('span', class_='h2') assert id_.text == 'Submission #{}'.format(self.submission_id) # Source Code source_code = soup.find(id='submission-code') source_code = source_code.text.encode() # get tables tables = soup.find_all('table') if len(tables) == 3: submission_info, test_cases_summary, test_cases_data = tables elif len(tables) == 1: submission_info, = tables test_cases_summary = None test_cases_data = None else: assert False # Submission Info data = {} # type: Dict[str, str] problem_id = None # type: Optional[str] for tr in submission_info.find_all('tr'): key = tr.find('th').text.strip() value = tr.find('td').text.strip() data[key] = value if key == 'Task': problem = AtCoderProblem.from_url('https://atcoder.jp' + tr.find('a')['href']) assert problem is not None problem_id = problem.problem_id assert problem_id is not None submission_time = datetime.datetime.strptime(data['Submission Time'], '%Y-%m-%d %H:%M:%S+0900').replace(tzinfo=utils.tzinfo_jst) user_id = data['User'] language_name = data['Language'] score = float(data['Score']) code_size = int(utils.remove_suffix(data['Code Size'], ' Byte')) status = data['Status'] if 'Exec Time' in data: exec_time_msec = int(utils.remove_suffix(data['Exec Time'], ' ms')) # type: Optional[int] else: exec_time_msec = None if 'Memory' in data: # TODO: confirm this is KB truly, not KiB memory_byte = int(utils.remove_suffix(data['Memory'], ' KB')) * 1000 # type: Optional[int] else: memory_byte = None # Compile Error compile_error_tag = soup.find('h4', text='Compile Error') if compile_error_tag is not None: compile_error = compile_error_tag.find_next_sibling('pre').text else: compile_error = None # Test Cases if test_cases_summary is not None: trs = test_cases_summary.find('tbody').find_all('tr') test_sets = [AtCoderSubmissionTestSet._from_table_row(tr) for tr in trs] # type: Optional[List[AtCoderSubmissionTestSet]] else: test_sets = None if test_cases_data is not None: trs = test_cases_data.find('tbody').find_all('tr') test_cases = [AtCoderSubmissionTestCaseResult._from_table_row(tr) for tr in trs] # type: Optional[List[AtCoderSubmissionTestCaseResult]] else: test_cases = None return AtCoderSubmissionDetailedData( code_size=code_size, compile_error=compile_error, exec_time_msec=exec_time_msec, language_name=language_name, memory_byte=memory_byte, problem=AtCoderProblem(contest_id=self.contest_id, problem_id=problem_id), problem_id=problem_id, response=resp, score=score, session=session, source_code=source_code, status=status, submission=self, submission_time=submission_time, test_cases=test_cases, test_sets=test_sets, timestamp=timestamp, user_id=user_id, )
[docs]class AtCoderSubmissionTestSet(object): """ :ivar set_name: :py:class:`str` :ivar score: :py:class:`float` :ivar max_score: :py:class:`float` :ivar test_case_names: :py:class:`List` [ :py:class:`str` ] """ def __init__(self, *, set_name: str, score: float, max_score: float, test_case_names: List[str]): self.set_name = set_name self.score = score self.max_score = max_score self.test_case_names = test_case_names @classmethod def _from_table_row(cls, tr: bs4.Tag) -> 'AtCoderSubmissionTestSet': tds = tr.find_all('td') assert len(tds) == 3 set_name = tds[0].text score, max_score = [float(s) for s in tds[1].text.split('/')] test_case_names = tds[2].text.split(', ') return AtCoderSubmissionTestSet(set_name=set_name, score=score, max_score=max_score, test_case_names=test_case_names)
[docs]class AtCoderSubmissionTestCaseResult(object): """ :ivar case_name: :py:class:`str` :ivar status: :py:class:`str` :ivar exec_time_msec: :py:class:`int` in millisecond :ivar memory_byte: :py:class:`int` in byte """ def __init__(self, *, case_name: str, status: str, exec_time_msec: Optional[int], memory_byte: Optional[int]): self.case_name = case_name self.status = status self.exec_time_msec = exec_time_msec self.memory_byte = memory_byte @classmethod def _from_table_row(cls, tr: bs4.Tag) -> 'AtCoderSubmissionTestCaseResult': tds = tr.find_all('td') case_name = tds[0].text status = tds[1].text exec_time_msec = None # type: Optional[int] memory_byte = None # type: Optional[int] if len(tds) == 4: exec_time_msec = int(utils.remove_suffix(tds[2].text, ' ms')) memory_byte = int(utils.remove_suffix(tds[3].text, ' KB')) * 1000 # TODO: confirm this is KB truly, not KiB else: assert len(tds) == 2 return AtCoderSubmissionTestCaseResult(case_name=case_name, status=status, exec_time_msec=exec_time_msec, memory_byte=memory_byte)
onlinejudge.dispatch.services += [AtCoderService] onlinejudge.dispatch.problems += [AtCoderProblem] onlinejudge.dispatch.submissions += [AtCoderSubmission]