import asyncio
import os
from asgiref.sync import sync_to_async
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
class Crawler(object):
def __init__(self, url: str, path: dict, cookies: [dict]=None,
headers:dict=None, test_mode: bool=False, timeout=30,
username=None, password=None) -> None:
self.url = url
self.cookies = cookies
self.headers = headers or {}
self.test_mode = test_mode
self.path = path or PATH
self.time_out = timeout
self.username = username
self.password = password
self.all_video_data = []
self.processed_video_data = []
self.failed_video_data = []
self.browser = None
async def launch(self):
self.browser = webdriver.Chrome(chrome_options=webdriver.ChromeOptions())
self.browser.maximize_window()
self.browser.implicitly_wait(self.time_out)
self.browser.get(self.url)
if self.cookies:
for cookie in self.cookies:
self.browser.add_cookie(cookie)
self.browser.refresh()
elif self.username and self.password:
await self.login()
try:
await self.crawl()
finally:
self.browser.quit()
self.browser = None
async def login(self):
for sub_path in self.path['ROOT_PATH_USER_LOGIN']:
targets, action = await self._get_markers(sub_path)
await self._run_action(action, targets[0])
async def get_video_link(self, sub_path):
video_link = ''
for p in sub_path:
try:
elements, action = await self._get_markers(p)
except Exception as e:
print(f'ERROR OCCURRED: Failed to execute self._get_markers({p}) - {e}')
else:
for el in elements:
value = await self._run_action(action, el)
if action == 'src':
video_link = value
return video_link
@sync_to_async
def _run_action(self, action, element):
value = None
if action == 'click':
self.browser.execute_script("arguments[0].scrollIntoView();", element)
value = WebDriverWait(self.browser, 30).until(
lambda x=None: element
if element and element.is_enabled()
else False
).click()
elif action == 'text':
value = element.get_attribute('innerText')
elif action == 'src':
value = element.get_attribute('src')
elif action in ('username', 'password'):
value = WebDriverWait(self.browser, 15).until(
lambda x=None: element
if element and element.is_enabled() and element.is_displayed()
else False
).send_keys(getattr(self, action))
return value
@sync_to_async
def _get_markers(self, sub_path):
marker_type, marker, vrange, action = sub_path
if action == 'src':
video_iframe = self.browser.find_elements_by_tag_name("iframe")[0]
self.browser.switch_to.frame(video_iframe)
else:
self.browser.switch_to.parent_frame()
if marker_type == 'selector':
targets = WebDriverWait(self.browser, 10).until(
lambda x=None : self.browser.find_elements_by_css_selector(marker)
)
elif marker_type == 'xpath':
targets = WebDriverWait(self.browser, 10).until(
lambda x=None : self.browser.find_elements_by_xpath(marker),
message=f'XPathError: {marker} not found.'
)
else:
raise ValueError(f'Invalid marker type: {marker_type}')
if -1 <= vrange >= 1:
targets = targets[:vrange]
print(targets[:vrange])
return targets, action
async def crawl(self):
video_pages, action = await self._get_markers(self.path['ROOT_PATH'][0])
print(self.path['ROOT_PATH'][0])
print(len(video_pages))
for i, page_element in enumerate(video_pages):
self.browser.switch_to.parent_frame()
self.browser.execute_script("arguments[0].scrollIntoView();", page_element)
page_element.click()
video_title_element, action = await self._get_markers(self.path['ROOT_PATH'][1])
if len(video_title_element) >= 1:
video_title = await self._run_action(action, video_title_element[0])
else:
raise ValueError('No markers found')
video_question_link = await self.get_video_link(self.path['ROOT_PATH_QUESTIONS'])
print(f'Found question src: {video_question_link}')
video_answer_link = await self.get_video_link(self.path['ROOT_PATH_ANSWERS'])
print(f'Found answer src: {video_answer_link}')
self.all_video_data.append({
'q_name': f'{i}_{video_title}_question',
'q_link': video_question_link,
'a_name': f'{i}_{video_title}_answer',
'a_link': video_answer_link
})
self.browser.execute_script("arguments[0].style.display='none';", page_element)
async def download_videos(self, dry_run=False):
download_dir = os.path.join(
os.path.expanduser('~'), 'Downloads', 'Linkedin-Interview-Prep'
)
if not os.path.exists(download_dir):
os.mkdir(download_dir)
print(self.all_video_data)
while self.all_video_data or self.browser:
print('Downloading...')
if self.all_video_data:
process_return_code = 0
video_data = self.all_video_data.pop(0)
if video_data.get('q_name') and video_data.get('q_link'):
print(f"Downloading video...: {video_data['q_name']}")
name = video_data['q_name'].replace('.','').strip()
q_file_name = os.path.join(download_dir, f"{name}.mp4")
q_command = f"wget {video_data['q_link']} -O '{q_file_name}'"
if not dry_run:
process = await asyncio.subprocess.create_subprocess_shell(q_command)
process_return_code = process.returncode
else:
print(f"Issuing command: {q_command}")
if process_return_code == 0:
self.processed_video_data.append({
'q_name': video_data.pop('q_name'),
'q_link': video_data.pop('q_link')
})
if video_data.get('a_name') and video_data.get('a_link'):
print(f"Downloading video...: {video_data['a_name']}")
name = video_data['a_name'].replace('.','').strip()
a_file_name = os.path.join(download_dir, f"{name}.mp4")
a_command = f"wget {video_data['a_link']} -O '{a_file_name}'"
if not dry_run:
process = await asyncio.subprocess.create_subprocess_shell(a_command)
process_return_code = process.returncode
else:
print(f"Issuing command: {a_command}")
if process_return_code == 0:
self.processed_video_data.append({
'a_name': video_data.pop('a_name'),
'a_link': video_data.pop('a_link')
})
self.failed_video_data.append(video_data)
await asyncio.sleep(2)
if __name__ == '__main__':
PATH = dict(
ROOT_PATH=[
('selector', '#ember60 > section > div > div > div > ol > li', -1,
'click'),
('selector', (
'#ember156 > div.interview-prep-question-details__container.container'
'-with-shadow.p0.mb4 > header > h1'), 1, 'text')
],
ROOT_PATH_QUESTIONS=[
('xpath', (
'/html/body/div[6]/div[4]/div[3]/div[1]/div/article/div[1]/div[2]/article/button'),
1, 'click'),
('xpath', '/html/body/div[4]/div/div/div[2]/div/div[1]/button', 1,
'click'),
('xpath', '/html/body/div/div/div[5]/div[1]/video', 1, 'src'),
('selector', 'button[data-test-modal-close-btn=""]', 1, 'click')
],
ROOT_PATH_ANSWERS=[
('xpath', (
'/html/body/div[6]/div[4]/div[3]/div[1]/div/article/div[1]/div[3]/article/button'),
1, 'click'),
('xpath', '/html/body/div[4]/div/div/div[2]/div/div[1]/button', 1,
'click'),
('xpath', '/html/body/div/div/div[5]/div[1]/video', 1, 'src'),
('selector', 'button[data-test-modal-close-btn=""]', 1, 'click')
],
ROOT_PATH_USER_LOGIN=[
('xpath', '/html/body/div/main/p/a', 1, 'click'),
('selector', '#username', 1, 'username'),
('selector', '#password', 1, 'password'),
('selector', '#app__container > main > div > form > div.login__form_action_container > button', 1, 'click')
],
)
COOKIES = None
APP_MODE = True
site_url = ('https://www.linkedin.com/interview-prep/assessments'
'/urn:li:fs_assessment:(1,a)/question'
'/urn:li:fs_assessmentQuestion:(10011,aq11)/')
username = os.getenv('LINKEDIN_USER')
password = os.getenv('LINKEDIN_PASS')
crawler = Crawler(site_url, PATH, COOKIES, username=username,
password=password, test_mode=APP_MODE, timeout=5)
loop = asyncio.get_event_loop()
tasks = [
crawler.launch(),
crawler.download_videos(dry_run=True),
]
loop.run_until_complete(asyncio.gather(*tasks))
print(f"Unprocessed Videos:{len(crawler.all_video_data)} {crawler.all_video_data}\n"
f"Processed Videos:{len(crawler.processed_video_data)} {crawler.processed_video_data}\n"
f"Failed Videos:{len(crawler.failed_video_data)} {crawler.failed_video_data}\n")
loop.close()