diff --git a/.gitignore b/.gitignore index ace90b6..021a48c 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,7 @@ log # kafka data kafka-data -aws_credentials \ No newline at end of file +aws_credentials +brickstudy_ingestion/dags/viral/tmp +brickstudy_ingestion/src/scrapper/results +.DS_Store \ No newline at end of file diff --git a/brickstudy_ingestion/dags/utils/config.py b/brickstudy_ingestion/dags/utils/config.py index bd1c384..a2f96a6 100644 --- a/brickstudy_ingestion/dags/utils/config.py +++ b/brickstudy_ingestion/dags/utils/config.py @@ -24,7 +24,9 @@ def set_env_variables(): "TWITTER_CLIENT_ID", "TWITTER_CLIENT_PASSWORD", "TWITTER_TOKEN" - "TWITTER_CRAWLER_AUTH_TOKEN_PASSWORD" + # Instagram + "INSTAGRAM_CLIENT_ID", + "INSTAGRAM_CLIENT_PASSWORD" ] for ENV_VARIABLE in ALL_ENV_VARIABLES: os.environ[ENV_VARIABLE] = Variable.get(ENV_VARIABLE, "") diff --git a/brickstudy_ingestion/dags/viral/instagram_crawler.py b/brickstudy_ingestion/dags/viral/instagram_crawler.py new file mode 100644 index 0000000..e8ca249 --- /dev/null +++ b/brickstudy_ingestion/dags/viral/instagram_crawler.py @@ -0,0 +1,89 @@ +from datetime import timedelta + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.python import PythonVirtualenvOperator, PythonOperator +from airflow.models import Variable + +from src.scrapper.brand_name_getter import get_brand_list_fr_s3 + +# ========================================= +# Change parameter +DAG_ID = "bronze_viral_instagram" +TARGET_PLATFORM = 'instagram' + +# Set aiflow setting +default_args = { + 'owner': 'brickstudy', + 'start_date': days_ago(0), + 'retries': 1, + 'retry_delay': timedelta(minutes=1), + # 'on_failure_callback': on_failure_callback, +} +# ========================================= + + +def get_brand_list(): + import os + for ENV_VARIABLE in ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY']: + os.environ[ENV_VARIABLE] = Variable.get(ENV_VARIABLE, "") + return get_brand_list_fr_s3() + + +def instagram_crawling(brand_lst, id, pwd): + import os + import logging + from src.common.kafka.utils import Kafka + from src.scrapper.inscrawler import InsCrawler + from src.scrapper.ins_url import InsURLCrawler + from src.scrapper.ins_data import InsDataCrawler + + os.environ['INSTAGRAM_CLIENT_ID'] = id + os.environ['INSTAGRAM_CLIENT_PASSWORD'] = pwd + + def crawl_instagram(keywords: tuple): + crawler = InsURLCrawler(InsCrawler(keywords=keywords)).get_urls() + post_crawler = InsDataCrawler(crawler.data) + post_crawler.get_post_data() + producer.send_data_to_kafka( + kafka_topic='instagram', + data=post_crawler.data + ) + + try: + producer = Kafka() + crawl_instagram(brand_lst) + except Exception as e: + logging.error("***entrypoint error***", e) + raise + + +with DAG( + dag_id=DAG_ID, + default_args=default_args, + schedule_interval='@daily', + catchup=False +): + t1 = PythonOperator( + task_id='get_brand_list_from_s3', + python_callable=get_brand_list + ) + + t2 = PythonVirtualenvOperator( + task_id='crawl_instagram_based_on_keyword', + system_site_packages=False, + op_kwargs={ + 'brand_lst': "{{ ti.xcom_pull(task_ids='get_brand_list_from_s3') }}", + 'id': Variable.get('INSTAGRAM_CLIENT_ID'), + 'pwd': Variable.get('INSTAGRAM_CLIENT_PASSWORD') + }, + python_version='3.10', + system_site_packages=False, + requirements=['selenium==4.24.0', 'webdriver-manager==4.0.2', + 'bs4==0.0.2', 'beautifulsoup4==4.12.3', + 'lxml==5.3.0', 'pytz==2024.1', + "python-dotenv==0.19.0", "multiprocess", "kafka-python"], + python_callable=instagram_crawling + ) + + t1 >> t2 \ No newline at end of file diff --git a/brickstudy_ingestion/dags/viral/twitter_crawler.py b/brickstudy_ingestion/dags/viral/twitter_crawler.py index 90970f9..8efd411 100644 --- a/brickstudy_ingestion/dags/viral/twitter_crawler.py +++ b/brickstudy_ingestion/dags/viral/twitter_crawler.py @@ -1,4 +1,5 @@ from datetime import datetime +import os from airflow import DAG from airflow.models import Variable @@ -20,11 +21,11 @@ } # ========================================= - OUTPUT_FILENAME = "test.csv" SEARCH_KEYWORD = "enhypen" LIMIT = 10 TOKEN = Variable.get("TWITTER_CRAWLER_AUTH_TOKEN_PASSWORD") +HOST_BASE_PATH = '/Users/seoyeongkim/Documents/ETL' with DAG( dag_id=DAG_ID, @@ -36,15 +37,17 @@ task_id='t_docker', image='brickstudy/twitter_crawler:latest', container_name='twitter_crawler', + api_version='1.37', auto_remove=True, mount_tmp_dir=False, mounts=[ - Mount(source="/opt/airflow/logs/tweets-data", target="/app/tweets-data", type="bind"), + Mount(source=f"{HOST_BASE_PATH}/logs", target="/app/tweets-data", type="bind"), ], command=[ + "bash", "-c", f"npx --yes tweet-harvest@latest -o {OUTPUT_FILENAME} -s {SEARCH_KEYWORD} -l {LIMIT} --token {TOKEN}" ], - docker_url='unix://var/run/docker.sock', + docker_url='tcp://docker-socket-proxy:2375', network_mode='bridge', ) diff --git a/brickstudy_ingestion/src/scrapper/__init__.py b/brickstudy_ingestion/src/scrapper/__init__.py index 673266f..fd40910 100644 --- a/brickstudy_ingestion/src/scrapper/__init__.py +++ b/brickstudy_ingestion/src/scrapper/__init__.py @@ -1,38 +1,4 @@ -import urllib -from urllib.request import urlopen -from urllib.error import HTTPError, URLError -from bs4 import BeautifulSoup -import random -import time -from src.common.exception import ExtractError -def get_soup(url: str = None): - user_agent_lst = ['Googlebot', 'Yeti', 'Daumoa', 'Twitterbot'] - user_agent = user_agent_lst[random.randint(0, len(user_agent_lst) - 1)] - headers = {'User-Agent': user_agent} - try: - req = urllib.request.Request(url, headers=headers) - page = urlopen(req) - html = page.read().decode("utf-8") - soup = BeautifulSoup(html, "html.parser") - except (HTTPError, URLError) as e: - err = ExtractError( - code=000, - message=f"**{url}** HTTPError/URLError. Sleep 5 and continue.", - log=e - ) - time.sleep(5) # TODO 이 경우 해당 url에 대해 재실행 필요 - except (ValueError) as e: - err = ExtractError( - code=000, - message=f"**{url}** ValueError. Ignore this url parameter.", - log=e - ) - print(err) - soup = None # TODO 해당 url 무시 - else: - time.sleep(random.random()) - return soup diff --git a/brickstudy_ingestion/src/scrapper/brand_name_getter.py b/brickstudy_ingestion/src/scrapper/brand_name_getter.py new file mode 100644 index 0000000..8b045ff --- /dev/null +++ b/brickstudy_ingestion/src/scrapper/brand_name_getter.py @@ -0,0 +1,61 @@ +import json + +from src.common.aws.s3_uploader import S3Uploader +from src.scrapper.models import OliveyoungBrand + + +def get_latest_dt(): + return '2024-08-20' + + +def category_checker(category: list) -> bool: + """ + standard 기준 카테고리에 하나라도 속해있으면 True 반환, 아니라면 False 반환 + """ + compare = set([c.split('_')[0] for c in category]) + standard = {'메이크업', '스킨케어', '향수', '헤어케어', '바디케어', '마스크팩', + '클렌징', '선케어', '더모코스메틱', '맨즈케어'} + if len(compare & standard) > 0: + return True + return False + + +def filter_brand(file_content: str) -> list: + filtered = [] + for line in file_content.split('\n'): + if line == '': + break + for brandname, brandinfo in json.loads(line).items(): + brandinfo_dic = OliveyoungBrand(**brandinfo) + if category_checker(brandinfo_dic.category): + filtered.append(brandname) + return filtered + + +def get_brand_list_fr_s3(): + s3_client = S3Uploader().s3_client + bucket = 'brickstudy' + + def file_keys_getter(): + paginator = s3_client.get_paginator('list_objects_v2') + prefix = f"bronze/viral/oliveyoung/{get_latest_dt()}" + file_key_lst = [] + for page in paginator.paginate( + Bucket=bucket, + Prefix=prefix + ): + if 'Contents' in page: + for obj in page['Contents']: + file_key_lst.append(obj['Key']) + return file_key_lst + + file_key_lst = file_keys_getter() + filtered_brand_lst = [] + for filekey in file_key_lst: + response = s3_client.get_object( + Bucket=bucket, + Key=filekey + ) + file_content = response['Body'].read().decode('utf-8') + filtered_brand_lst += filter_brand(file_content) + return filtered_brand_lst \ No newline at end of file diff --git a/brickstudy_ingestion/src/scrapper/browser.py b/brickstudy_ingestion/src/scrapper/browser.py deleted file mode 100644 index d2bc7bf..0000000 --- a/brickstudy_ingestion/src/scrapper/browser.py +++ /dev/null @@ -1,103 +0,0 @@ -import os - -from selenium import webdriver -from selenium.common.exceptions import NoSuchElementException -from selenium.common.exceptions import TimeoutException -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.ui import WebDriverWait -# from selenium.webdriver.common.keys import Keys -from fake_useragent import UserAgent - -from utils import randmized_sleep - - -class Browser: - def __init__(self, has_screen): - dir_path = os.path.dirname(os.path.realpath(__file__)) - service_args = ["--ignore-ssl-errors=true"] - chrome_options = Options() - if not has_screen: - chrome_options.add_argument("--headless") - chrome_options.add_argument("--start-maximized") - chrome_options.add_argument("--no-sandbox") - chrome_options.add_argument("user-agent=" + UserAgent().random) - self.driver = webdriver.Chrome( - executable_path=f"{dir_path}/bin/chromedriver", - service_args=service_args, - chrome_options=chrome_options, - ) - self.driver.implicitly_wait(5) - - @property - def page_height(self): - return self.driver.execute_script("return document.body.scrollHeight") - - def get(self, url): - self.driver.get(url) - - @property - def current_url(self): - return self.driver.current_url - - def implicitly_wait(self, t): - self.driver.implicitly_wait(t) - - def find_one(self, css_selector, elem=None, waittime=0): - obj = elem or self.driver - - if waittime: - WebDriverWait(obj, waittime).until( - EC.presence_of_element_located((By.CSS_SELECTOR, css_selector)) - ) - - try: - return obj.find_element(By.CSS_SELECTOR, css_selector) - except NoSuchElementException: - return None - - def find(self, css_selector, elem=None, waittime=0): - obj = elem or self.driver - - try: - if waittime: - WebDriverWait(obj, waittime).until( - EC.presence_of_element_located((By.CSS_SELECTOR, css_selector)) - ) - except TimeoutException: - return None - - try: - return obj.find_elements(By.CSS_SELECTOR, css_selector) - except NoSuchElementException: - return None - - def scroll_down(self, wait=0.3): - self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)") - randmized_sleep(wait) - - def scroll_up(self, offset=-1, wait=2): - if offset == -1: - self.driver.execute_script("window.scrollTo(0, 0)") - else: - self.driver.execute_script("window.scrollBy(0, -%s)" % offset) - randmized_sleep(wait) - - def js_click(self, elem): - self.driver.execute_script("arguments[0].click();", elem) - - def open_new_tab(self, url): - self.driver.execute_script("window.open('%s');" % url) - self.driver.switch_to.window(self.driver.window_handles[1]) - - def close_current_tab(self): - self.driver.close() - - self.driver.switch_to.window(self.driver.window_handles[0]) - - def __del__(self): - try: - self.driver.quit() - except Exception: - pass diff --git a/brickstudy_ingestion/src/scrapper/http_429_handler.py b/brickstudy_ingestion/src/scrapper/http_429_handler.py new file mode 100644 index 0000000..d1768ff --- /dev/null +++ b/brickstudy_ingestion/src/scrapper/http_429_handler.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 + +import urllib.error +import urllib.request + +from tenacity import retry, retry_if_exception, stop_after_attempt, wait_fixed +from tenacity.wait import wait_base + + +class retry_if_http_429_error(retry_if_exception): + """Retry strategy that retries if the exception is an ``HTTPError`` with + a 429 status code. + + """ + + def __init__(self): + def is_http_429_error(exception): + return ( + isinstance(exception, urllib.error.HTTPError) and + exception.getcode() == 429 + ) + + super().__init__(predicate=is_http_429_error) + + +class wait_for_retry_after_header(wait_base): + """Wait strategy that tries to wait for the length specified by + the Retry-After header, or the underlying wait strategy if not. + See RFC 6585 § 4. + + Otherwise, wait according to the fallback strategy. + """ + def __init__(self, fallback): + self.fallback = fallback + + def __call__(self, retry_state): + # retry_state is an instance of tenacity.RetryCallState. The .outcome + # property is the result/exception that came from the underlying function. + exc = retry_state.outcome.exception() + if isinstance(exc, urllib.error.HTTPError): + retry_after = exc.headers.get("Retry-After") + + try: + return 3600 if retry_after is None else int(retry_after) + except (TypeError, ValueError): + pass + + return self.fallback(retry_state) + + +@retry( + retry=retry_if_http_429_error(), + wait=wait_for_retry_after_header(fallback=wait_fixed(1)), + stop=stop_after_attempt(3) +) +def get_url_with_tenacity_(url): + return urllib.request.urlopen(url) \ No newline at end of file diff --git a/brickstudy_ingestion/src/scrapper/ins_data.py b/brickstudy_ingestion/src/scrapper/ins_data.py new file mode 100644 index 0000000..d44ed08 --- /dev/null +++ b/brickstudy_ingestion/src/scrapper/ins_data.py @@ -0,0 +1,146 @@ +import time +from bs4 import BeautifulSoup +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.common.by import By +import requests +import re +import random + +from src.scrapper.inscrawler import InsCrawler +from src.scrapper.http_429_handler import get_url_with_tenacity_ + + +class InsDataCrawler(InsCrawler): + def __init__(self, + driver, data, + dev: bool = False): + super().__init__(dev=dev, driver=driver) + self.data = data + self.headers = { + 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36' + } + + def get_post_data(self): + results = f'{self.base_path}/results/data.txt' + with open(results, 'r') as f: + post_crawled_data = {line.strip() for line in f} + + for idx, (key, val) in enumerate(self.data.items()): + if self.numof_error > 5: break + + post_url = val.post_url + + if post_url in post_crawled_data: + continue + + time.sleep(random.randrange(2, 10) + random.random()) + get_url_with_tenacity_(post_url) + self.driver.get(post_url) + print(idx, '. ' + post_url) + + try: + html = self.driver.page_source + soup = BeautifulSoup(html, 'lxml') + + # 작성자 + username = soup.find('span', {'class': '_ap3a _aaco _aacw _aacx _aad7 _aade'}).text + print(username, end=' ') + self.data[key].username = username + + # 작성일자 + date = soup.find_all('time')[-1]['datetime'][:10] + print(date, end=' ') + self.data[key].date = date + + # like 개수 + try: + like = soup.find('span', {'class': 'html-span xdj266r x11i5rnm xat24cr x1mh8g0r xexx8yu x4uap5 x18d9i69 xkhd6sd x1hl2dhg x16tdsg8 x1vvkbs'}).text + except Exception: + like = 'no data' # ~~외 여러 명이 좋아합니다. 같은 경우 + print(like) + self.data[key].like = like + + # 이미지 저장 + images = [] + img_urls = set() + images.append(self.driver.find_elements(By.CLASS_NAME, 'x5yr21d.xu96u03.x10l6tqk.x13vifvy.x87ps6o.xh8yej3')) + + for i in range(len(images)): + for j in range(len(images[i])): + + if j >= 3: # 4번째부터 타 게시물의 썸네일 이미지 + break + + alt = images[i][j].get_attribute('alt') + check = re.findall(r'by .+? on', alt) # 타 게시물인지 아닌지 검사 + + if check != []: + img_urls.add(images[i][j].get_attribute('src')) + + # 이미지 끝까지 넘기면서 url 추출 + try: + while True: + time.sleep(random.randrange(3, 6) + random.random()) + WebDriverWait(self.driver, random.randrange(1, 4)) + self.driver.find_element(By.CLASS_NAME, '_afxw._al46._al47').click() # 다음 이미지 버튼 클릭 + images.append(self.driver.find_elements(By.CLASS_NAME, 'x5yr21d.xu96u03.x10l6tqk.x13vifvy.x87ps6o.xh8yej3')) + + for i in range(len(images)): + for j in range(len(images[i])): + + if j >= 3: # 4번째부터 타 게시물의 썸네일 이미지 + break + + alt = images[i][j].get_attribute('alt') + check = re.findall(r'by .+? on', alt) # 타 게시물인지 아닌지 검사 + + if check != []: + img_urls.add(images[i][j].get_attribute('src')) + + images.clear() + + except Exception: + print('더 이상 넘길 이미지 없음') + + img_urls = list(img_urls) + print(img_urls) + images.clear() + + saved_imgs = set() + for img_url in img_urls: + # 이미지만 고려. 우선 비디오 타입은 고려하지 않음. + pattern = r'\/v\/[^\/]+\/([^\/\?]+)\.(jpg|png|webp|heic)' + match = re.search(pattern, img_url) + if match: + img_name = match.group(1) + '.' + match.group(2) + else: + print('파일을 찾을 수 없거나 jpg 혹은 png, webp, heic 파일이 아님.') + continue + + if img_name not in saved_imgs: + response = requests.get(img_url, headers=self.headers, timeout=20) + + with open(f'{self.base_path}/results/images/' + img_name, 'wb') as f: + f.write(response.content) + + saved_imgs.add(img_name) + + time.sleep(.5) + + print(f"총 {len(saved_imgs)} 장의 이미지 저장") + self.data[key].saved_imgs = str(list(saved_imgs)) + + time.sleep(random.randrange(3, 5)) + + except Exception as e: + print(e) + self.numof_error += 1 + print('오류 발생') + time.sleep(20 + random.random()) + + # 수집 완료된 데이터 키값(post url unique id) 저장 + with open(results, 'a') as f: + for key in self.data.keys(): + f.write(key + '\n') + + self.driver.close() diff --git a/brickstudy_ingestion/src/scrapper/ins_runner.py b/brickstudy_ingestion/src/scrapper/ins_runner.py new file mode 100644 index 0000000..073cbb2 --- /dev/null +++ b/brickstudy_ingestion/src/scrapper/ins_runner.py @@ -0,0 +1,128 @@ +from src.scrapper.brand_name_getter import get_brand_list_fr_s3 +from src.scrapper.ins_url import InsURLCrawler +from src.scrapper.ins_data import InsDataCrawler +from src.scrapper.utils import write_local_as_json +from src.scrapper.utils import current_datetime_getter + +import os +import logging +import subprocess + +logger = logging.getLogger('insrunner') +logger.setLevel(logging.ERROR) + +twitter_keyword = [ + "닥터지", "아이소이", "에뛰드", "에스트라", "유세린", "토리든" +] + + +def get_brand_lst_wo_ingested_list(): + brand_lst = get_brand_list_fr_s3() + with open(f"{base_path}/results/finished_brand.txt", "r") as f: + skip = f.read() + return list(set(brand_lst) - set(skip[:-1].split('\n'))) + + +def crawl_data(brand_lst: list, err: int): + """ + brand_lst 에 속한 brand이 언급된 데이터를 인스타그램으로부터 수집하여 + ./results/data, ./results/images에 저장하는 함수 + :brand_lst: 크롤링할 서치 키워드가 담긴 리스트 + :err: 크롤링 진행 과정에서 발생한 오류 횟수 + """ + for brand in brand_lst: + if err > 10: + break + + crawler = InsURLCrawler(dev=True) + crawler.get_urls(keyword=brand) + crawler.materialize() + err += crawler.numof_error + + post_crawler = InsDataCrawler( + driver=crawler.driver, + data=crawler.data, + dev=True + ) + post_crawler.get_post_data() + err += post_crawler.numof_error + + try: + cur_date = current_datetime_getter() + write_local_as_json( + data=post_crawler.data, + file_path=f"{post_crawler.base_path}/results/data", + file_name=f"instagram_{cur_date}" + ) + with open(f"{post_crawler.base_path}/results/finished_brand.txt", "a") as f: + f.write(f"{brand}\n") + except Exception as e: + logging.error( + "{} data write 과정에서 오류 발생. \nerror message: {}".format(brand, e) + ) + + return err + + +def s3_upload(local_path: str, target: str = 'data'): + local_folder = os.path.join(local_path, target) + dt = current_datetime_getter() + dt = dt.split('_')[0] + s3_folder = f"bronze/viral/instagram/{dt[:4]}-{dt[4:6]}-{dt[6:]}/{target}" + bucket_name = "brickstudy" + try: + subprocess.run( + ['aws', 's3', 'cp', local_folder, f's3://{bucket_name}/{s3_folder}/', '--recursive'], + check=True + ) + print(f"Folder {local_folder} uploaded to s3://{bucket_name}/{s3_folder}/") + except subprocess.CalledProcessError as e: + print(f"Failed to upload folder: {str(e)}") + + +if __name__ == '__main__': + base_path = "/Users/seoyeongkim/Documents/ETL/brickstudy_ingestion/src/scrapper/" + # shutil.rmtree(base_path + "results/data") + # shutil.rmtree(base_path + "results/images") + # os.mkdir(base_path + "results/data") + # os.mkdir(base_path + "results/images") + + err = 0 + # brand_lst = get_brand_lst_wo_ingested_list() + + brand_lst = twitter_keyword + for block_s in range(0, len(brand_lst), 10): + partitioned = brand_lst[block_s:block_s + 10] + print(f"**** start crawling {partitioned} ****") + err += crawl_data(brand_lst[block_s:block_s + 10], err) + + # s3_upload(base_path + "results", 'data') + # s3_upload(base_path + "results", 'images') + + +""" +curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://kafka-connect:8083/connectors/sink-s3-voluble/config -d '{ + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "tasks.max": 1, + "topics": "instagram", + "aws.signing_region": "ap-northeast-2", + "s3.part.size": 5242880, + "s3.region": "ap-northeast-2", + "s3.bucket.name": "brickstudy", + "s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", + "topics.dir": "bronze/viral", + "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", + "partition.duration.ms": "86400000", + "timestamp.extractor": "Record", + "path.format": "yyyy-MM-dd", + "flush.size": 100, + "rotate.interval.ms": 60000, + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "locale": "ko_KR", + "timezone": "Asia/Seoul" +}' +""" \ No newline at end of file diff --git a/brickstudy_ingestion/src/scrapper/ins_url.py b/brickstudy_ingestion/src/scrapper/ins_url.py new file mode 100644 index 0000000..7cdb1cb --- /dev/null +++ b/brickstudy_ingestion/src/scrapper/ins_url.py @@ -0,0 +1,72 @@ +import time +from bs4 import BeautifulSoup +import urllib +import re +import random + +from src.scrapper.inscrawler import InsCrawler +from src.scrapper.http_429_handler import get_url_with_tenacity_ + + +class InsURLCrawler(InsCrawler): + def __init__(self, keywords: list = None, dev: bool = False): + super().__init__(keywords, dev) + + def get_urls(self, keyword: str = None): + if keyword is not None: # execute with given keyword + self._fetch_url_data(keyword) + else: # execute with entire keywords + for keyword in self.keywords: + self._fetch_url_data(keyword) + + def _fetch_url_data(self, keyword): + word = urllib.parse.quote(keyword) + word_url = f'https://www.instagram.com/explore/tags/{word}/' + get_url_with_tenacity_(word_url) + self.driver.get(word_url) + + try: + for _ in range(10): # 스크롤 10회 + time.sleep(random.randrange(3, 4) + random.random()) + js = 'window.scrollBy(0,7000)' + self.driver.execute_script(js) + time.sleep(random.randrange(3, 4) + random.random()) + html = self.driver.page_source + soup = BeautifulSoup(html, 'lxml') + + divimg = soup.find_all('img', {'class': 'x5yr21d xu96u03 x10l6tqk x13vifvy x87ps6o xh8yej3'}) + if not divimg: + print('이미지를 찾을 수 없습니다.') + raise Exception + + for div in divimg: + content = div.get('alt') + if not content: + print('내용이 없습니다.') + continue + + a = div.find_parent('a') + if a is None: + print('게시물 링크가 잘못되었습니다.') + continue + urlto = a.get('href') + if urlto is None: + print('게시물 링크가 없습니다.') + continue + totalurl = 'https://www.instagram.com' + urlto + self.data[urlto].brand = keyword + self.data[urlto].post_url = totalurl + + modified_content = re.sub(r'\s*\n\s*', ' ', content) + self.data[urlto].full_text = modified_content + + print(f'페이지 {keyword}에서 데이터를 가져오는 중...') + time.sleep(5) + + except Exception as e: + self.numof_error += 1 + print(e) + print('오류 발생') + + print(f'키워드 {keyword}의 URL 정보 수집 완료.') + self.driver.close() \ No newline at end of file diff --git a/brickstudy_ingestion/src/scrapper/inscrawler.py b/brickstudy_ingestion/src/scrapper/inscrawler.py index 25de1e0..ec2dc48 100644 --- a/brickstudy_ingestion/src/scrapper/inscrawler.py +++ b/brickstudy_ingestion/src/scrapper/inscrawler.py @@ -1,43 +1,92 @@ import os +import time +import json +from collections import defaultdict +import random -from src.common.exception import RetryException +from src.scrapper.models import inst_generator +from src.scrapper.utils import get_driver -from browser import Browser -from utils import retry +class InsCrawler: + def __init__(self, + keywords: list = None, + dev: bool = False, + driver=None): + self.account_x = random.randrange(0, 2) + if dev: + proj_path = f"{'/'.join(os.getcwd().split('/')[:os.getcwd().split('/').index('ETL') + 1])}/brickstudy_ingestion" + self.driver = get_driver() + else: + proj_path = '/opt/airflow/brickstudy_ingestion' + self.driver = driver + self.base_path = f"{proj_path}/src/scrapper" -class InsCrawler(): - URL = "https://www.instagram.com" - RETRY_LIMIT = 10 + user_id, password = self.load_config(dev=dev) + self.keywords = keywords + self.data = defaultdict(inst_generator) + self.numof_error = 0 - def __init__(self, has_screen=False): - super(InsCrawler, self).__init__() - self.browser = Browser(has_screen) - self.page_height = 0 - self.login() + self.login(user_id, password) - def login(self): - browser = self.browser - url = "%s/accounts/login/" % (InsCrawler.URL) - browser.get(url) - u_input = browser.find_one('input[name="username"]') - u_input.send_keys(os.getenv('INSTAGRAM_ID')) - p_input = browser.find_one('input[name="password"]') - p_input.send_keys(os.getenv('INSTAGRAM_PWD')) + if self.suspicous_check(): + #TODO 계정 사용비율 낮추기 + print("return True in suspicious check") + time.sleep(300) - login_btn = browser.find_one('button[type="submit"]') - login_btn.click() - @retry() - def check_login(): - if browser.find_one('input[name="username"]'): - raise RetryException() + def load_config(self, dev: bool = False): + if dev: + with open(f'{self.base_path}/config.json', 'r', encoding='utf-8') as f: + config = json.load(f) - check_login() + username = config['login']['username'][self.account_x] + password = config['login']['password'][self.account_x] + else: + username = os.getenv('INSTAGRAM_CLIENT_ID') + password = os.getenv('INSTAGRAM_CLIENT_PASSWORD') + return (username, password) + + def login(self, user_id: str, password: str): + # Instagram 접속 및 로그인 + url = 'https://www.instagram.com/' + self.driver.get(url) + time.sleep(random.randrange(4, 6) + random.random()) + self.driver.find_element(By.XPATH, '//*[@id="loginForm"]/div/div[1]/div/label/input').send_keys(user_id) + time.sleep(random.randrange(1, 3) + random.random()) + self.driver.find_element(By.XPATH, '//*[@id="loginForm"]/div/div[2]/div/label/input').send_keys(password) + time.sleep(random.randrange(1, 3) + random.random()) + self.driver.find_element(By.XPATH, '//*[@id="loginForm"]/div/div[3]/button/div').click() + time.sleep(random.randrange(5, 11) + random.random()) + + def materialize(self): + """ + self.data to csv file + """ + from src.scrapper.utils import current_datetime_getter + import csv + + with open(f"{self.base_path}/results/insdata_{current_datetime_getter()}.csv", 'w') as f: + w = csv.writer(f) + w.writerow(self.data.values()) + + def suspicous_check(self): + """ 현재 자동화 행동 의심받는지 확인 """ + try: + if 'wbloks_1' in self.driver.page_source: + print("자동화된 활동 경고가 나타났습니다.") + + close_button = self.driver.find_element(By.XPATH, '//div[@aria-label="Dismiss"]') + self.driver.execute_script("arguments[0].dispatchEvent(new MouseEvent('click', {bubbles: true}));", close_button) + + # # 닫기 버튼 클릭, 계정 사용 일시 중지 + # close_button = WebDriverWait(self.driver, 5).until( + # EC.element_to_be_clickable((By.XPATH, '//div[@aria-label="Dismiss"]')) + # ) + # close_button.click() + return True + return False + except Exception: + self.numof_error += 1 + return False - def get_latest_posts_by_tag(self, tag, num): - tag = 'enhypen' - url = f"{InsCrawler.URL}/explore/search/keyword/?q=%23{tag}" - self.browser.get(url) - self.browser.scroll_down() - #TODO 게시물 클릭, 컨텐츠 가져오기 \ No newline at end of file diff --git a/brickstudy_ingestion/src/scrapper/models.py b/brickstudy_ingestion/src/scrapper/models.py index 7c5954e..5cbeda9 100644 --- a/brickstudy_ingestion/src/scrapper/models.py +++ b/brickstudy_ingestion/src/scrapper/models.py @@ -6,7 +6,6 @@ class OliveyoungBrand: query_keyword: List[str] # api 쿼리 키워드 - 브랜드 영문이름 등 brand_shop_detail_url: str # 브랜드관 url - items: Dict[str, bool] # 브랜드 제품 리스트 {제품명:할인여부} category: List[str] # 브랜드가 속한 카테고리 released_date: str = field(default_factory='2024/08/05') # 신제품 출시 일자 @@ -15,7 +14,38 @@ def brand_generator(): return OliveyoungBrand( [], '', - {}, [], '' ) + +@dataclass +class OliveyoungItem: + item_name: str + item_detail_url: str + is_in_promotion: bool + reviews: List[str] + + +def oliveyoung_item_generator(): + return OliveyoungItem( + '', + '', + False, + [] + ) + +@dataclass +class InstagramData: + brand: str + post_url: str + full_text: str + username: str + like: int + saved_imgs: str + date: str + + +def inst_generator(): + return InstagramData( + '', '', '', '', 0, '', '' + ) diff --git a/brickstudy_ingestion/src/scrapper/oliveyoung.py b/brickstudy_ingestion/src/scrapper/oliveyoung.py index 5dd94bd..90ed7b7 100644 --- a/brickstudy_ingestion/src/scrapper/oliveyoung.py +++ b/brickstudy_ingestion/src/scrapper/oliveyoung.py @@ -1,16 +1,13 @@ from collections import defaultdict from datetime import datetime -from . import get_soup +from src.scrapper.utils import get_soup from src.scrapper.models import brand_generator class Brand: - def __init__(self, brand_metadata=None) -> None: - if brand_metadata: - self.brand_metadata = brand_metadata - else: - self.brand_metadata = defaultdict(brand_generator) + def __init__(self) -> None: + self.brand_metadata = defaultdict(brand_generator) def crawl_brand_metadata(self): self._get_brand_in_each_category( @@ -18,9 +15,6 @@ def crawl_brand_metadata(self): ) self._get_brand_shop_url() - def crawl_items(self): - self._get_items() - @staticmethod def _get_oliveyoung_category_urls() -> list: """ @@ -84,29 +78,18 @@ def _get_brand_shop_url(self) -> None: for a_tag in total_brand_list_soup.find_all('a'): brand_code = a_tag.get('data-ref-onlbrndcd') if brand_code: - brand_name = a_tag.text - if brand_name in self.brand_metadata.keys(): # Kor brand name - self.brand_metadata[brand_name].brand_shop_detail_url = brand_base_url + brand_code - code_name[brand_code] = brand_name + brand = a_tag.text + if brand in self.brand_metadata.keys(): # Kor brand name + self.brand_metadata[brand].brand_shop_detail_url = brand_base_url + brand_code + code_name[brand_code] = brand else: # Eng brand name try: kor_brand_name = code_name[brand_code] - self.brand_metadata[kor_brand_name].query_keyword.append(brand_name) + self.brand_metadata[kor_brand_name].query_keyword.append(brand) except Exception: pass - def _get_items(self) -> None: - """ - 각 브랜드의 제품 리스트, 해당 제품의 프로모션 여부 추가 - """ - for brand in self.brand_metadata.keys(): - brand_url = self.brand_metadata[brand].brand_shop_detail_url - brand_url_soup = get_soup(brand_url) - if brand_url_soup is None: - continue - item_dic = {} - for div in brand_url_soup.find_all('div', class_='prod-info'): - item_name = div.find('a').get('data-attr') - is_in_promotion = div.find('div', class_="discount") is not None - item_dic[item_name] = is_in_promotion - self.brand_metadata[brand].items = item_dic +if __name__ == "__main__": + brand = Brand() + brand.crawl_brand_metadata() + print(brand.brand_metadata) \ No newline at end of file diff --git a/brickstudy_ingestion/src/scrapper/oliveyoung_items.py b/brickstudy_ingestion/src/scrapper/oliveyoung_items.py new file mode 100644 index 0000000..ea8e29f --- /dev/null +++ b/brickstudy_ingestion/src/scrapper/oliveyoung_items.py @@ -0,0 +1,130 @@ +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + +from datetime import datetime, timedelta +from collections import defaultdict +import requests +import time +import random + +from src.scrapper.models import oliveyoung_item_generator +from src.scrapper.utils import write_local_as_json + +class Items: + def __init__(self, brand_name: str, brand_url: str): + self.brand = brand_name + self.brand_url = brand_url + self.data = defaultdict(oliveyoung_item_generator) + self.item_id = None + self.driver = webdriver.Chrome() + + def crawl_total_items(self): + # brand 페이지에서 전체 item 정보들 수집 + self.driver.get(self.brand_url) + self._get_items() + + def crawl_reviews_in_each_items(self, item_id: str): + # 각 item 페이지에서 리뷰 수집 + self.item_id = item_id + item_url = self.data[item_id].item_detail_url + self.driver.get(item_url) + self._get_reviews() + + def _get_items(self) -> None: + """ + 하나의 brand page의 item page x에 있는 아이템정보(id, url, 상품명, 할인여부) 수집 + """ + # 최초 1페이지 상품 정보 수집 + self._get_products() + # 페이지 넘기면서 상품 정보 수집 + next_pages = self.driver.find_elements(By.CSS_SELECTOR, '.pageing a[data-page-no]') + if next_pages: + for next_page in next_pages: + try: + self.driver.execute_script("arguments[0].click();", next_page) + time.sleep(random.randrange(5, 7) + random.random()) + except: + time.sleep(2) + self._get_products() + + def _get_products(self) -> None: + """ + 아이템 element 찾아서 실제 수집 동작 + """ + products = self.driver.find_elements(By.CSS_SELECTOR, 'ul.prod-list.goodsProd div.prod a.thumb') + for product in products: + href = product.get_attribute('href') + data_ref_goodsno = product.get_attribute('data-ref-goodsno') + data_attr = product.get_attribute('data-attr') + is_in_promotion = len(product.find_elements(By.CLASS_NAME, 'discount')) > 0 + item_id = f"{self.brand}_{data_ref_goodsno}" + + self.data[item_id].item_name = data_attr + self.data[item_id].item_detail_url = href + self.data[item_id].is_in_promotion = is_in_promotion + + def _get_reviews(self): + self.__click_review_button() + self.__click_latest_button() + + self.__get_reviews_with_page_moving() + + def __click_review_button(self) -> None: + try: + review_button_element = self.driver.find_element(By.CSS_SELECTOR, 'a.goods_reputation[data-attr="상품상세^상품상세_SortingTab^리뷰"]') + self.driver.execute_script("arguments[0].scrollIntoView(true);", review_button_element) + self.driver.execute_script("arguments[0].click();", review_button_element) + time.sleep(random.randint(1, 3)) + except Exception as e: + print(f"리뷰 버튼 클릭 실패: {e}") + + def __click_latest_button(self) -> None: + try: + latest_button_element = self.driver.find_element(By.CSS_SELECTOR, 'a[data-sort-type-code="latest"][data-attr="상품상세^리뷰정렬^최신순"]') + self.driver.execute_script("arguments[0].scrollIntoView(true);", latest_button_element) + self.driver.execute_script("arguments[0].click();", latest_button_element) + time.sleep(random.randint(1, 3)) + except Exception as e: + print(f"최신순 버튼 클릭 실패: {e}") + + def __get_reviews_with_page_moving(self): + self.__get_reviews_in_each_page() + next_pages = self.driver.find_elements(By.CSS_SELECTOR, '.pageing a[data-page-no]') + flag = True + while flag: + flag = len(next_pages) == 10 # next page가 있으면 계속 클릭하면서 수집 + for i in range(len(next_pages)): + try: + self.driver.execute_script("arguments[0].click();", next_pages[i]) + time.sleep(random.randrange(3, 5) + random.random()) + except: + print("exception block in page moving") + time.sleep(3) + self.__get_reviews_in_each_page() + next_pages = self.driver.find_elements(By.CSS_SELECTOR, '.pageing a[data-page-no]') + + def __get_reviews_in_each_page(self): + """ + 리뷰 element 찾아서 실제 수집 동작 + """ + review_elements = self.driver.find_elements(By.CLASS_NAME, 'txt_inner') + date_elements = self.driver.find_elements(By.CLASS_NAME, 'date') + for rev_elem, date_elem in zip(review_elements, date_elements): + self.data[self.item_id].reviews.append((rev_elem.text, date_elem.text)) + + +if __name__ == "__main__": + brand_name = "토리든" + brand_url = "https://www.oliveyoung.co.kr/store/display/getBrandShopDetail.do?onlBrndCd=A002820&t_page=%EC%83%81%ED%92%88%EC%83%81%EC%84%B8&t_click=%EB%B8%8C%EB%9E%9C%EB%93%9C%EA%B4%80_%EC%83%81%EB%8B%A8&t_brand_name=%ED%86%A0%EB%A6%AC%EB%93%A0" + item_x = Items(brand_name, brand_url) + item_x.crawl_total_items() + print("crawl total item is done") + print(item_x.data) + + item_list = item_x.data.keys() + for idx, test_item in enumerate(item_list): + item_x.crawl_reviews_in_each_items(item_id=test_item) + if idx % 4 == 0: + write_local_as_json(item_x.data, './logs', f"{brand_name}_{idx}") diff --git a/brickstudy_ingestion/src/scrapper/readme.md b/brickstudy_ingestion/src/scrapper/readme.md index 4448800..65d9d98 100644 --- a/brickstudy_ingestion/src/scrapper/readme.md +++ b/brickstudy_ingestion/src/scrapper/readme.md @@ -2,11 +2,13 @@ ``` brickstudy_ingestion/src/scrapper -├── browser.py # selenium으로 크롤링에 필요한 configs, utils 정의 모듈 -├── inscrawler.py # instagram crawler main 모듈 -├── models.py -├── oliveyoung.py # oliveyoung scrapper main 모듈 -└── utils.py # 공통 메소드 +├── __init__.py # scrapper 모듈 entrypoint +├── Dockerfile # [Twitter] twitter crawler 동작 환경 +├── browser.py # [Instagram] selenium으로 크롤링하는데에 사용되는 로직 정의 모듈 +├── inscrawler.py # [Instagram]instagram crawler main 모듈 +├── models.py # [Oliveyoung] 올리브영 브랜드 수집 데이터 구조 +├── oliveyoung.py # [Oliveyoung] scrapper main 모듈 +└── utils.py # 공통 유틸리티 메소드 ``` diff --git a/brickstudy_ingestion/src/scrapper/utils.py b/brickstudy_ingestion/src/scrapper/utils.py index ca9d141..22dcec7 100644 --- a/brickstudy_ingestion/src/scrapper/utils.py +++ b/brickstudy_ingestion/src/scrapper/utils.py @@ -1,3 +1,91 @@ +def get_driver(): + """ + return selenium driver + """ + from selenium import webdriver + from selenium.webdriver.chrome.service import Service + from webdriver_manager.chrome import ChromeDriverManager + from selenium.webdriver.common.by import By + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC + proxies = [ + ["211.223.89.176:51147", + "121.66.105.19:51080", + "121.66.105.19:51080", + "8.213.128.6:8080"], + ["8.213.129.20:8090", + "8.213.129.20:5566", + "8.213.137.155:8090", + "8.220.204.215:808"], + ["8.220.205.172:9098", + "211.223.89.176:51147", + "8.213.128.90:2019", + "8.213.128.90:444"] + ] + user_agent_lst = [ + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1636.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36" + ] + options = webdriver.ChromeOptions() + # options.add_argument("--headless") + proxy = proxies[self.account_x][random.randrange(0, 4)] + webdriver.DesiredCapabilities.CHROME['proxy'] = { + "socksProxy": proxy, + "socksVersion": 4, + } + + options.add_argument("--disable-blink-features=AutomationControlled") + options.add_experimental_option("excludeSwitches", ["enable-automation"]) + options.add_experimental_option("useAutomationExtension", False) + driver = webdriver.Chrome( + options=options + ) + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": user_agent_lst[self.account_x]}) + return driver + + +def get_soup(url: str = None): + import urllib + from urllib.request import urlopen + from urllib.error import HTTPError, URLError + from bs4 import BeautifulSoup + import random + import time + + from src.common.exception import ExtractError + + user_agent_lst = ['Googlebot', 'Yeti', 'Daumoa', 'Twitterbot'] + user_agent = user_agent_lst[random.randint(0, len(user_agent_lst) - 1)] + headers = {'User-Agent': user_agent} + + try: + req = urllib.request.Request(url, headers=headers) + page = urlopen(req) + html = page.read().decode("utf-8") + soup = BeautifulSoup(html, "html.parser") + except (HTTPError, URLError) as e: + err = ExtractError( + code=000, + message=f"**{url}** HTTPError/URLError. Sleep 5 and continue.", + log=e + ) + time.sleep(5) # TODO 이 경우 해당 url에 대해 재실행 필요 + except (ValueError) as e: + err = ExtractError( + code=000, + message=f"**{url}** ValueError. Ignore this url parameter.", + log=e + ) + print(err) + soup = None # TODO 해당 url 무시 + else: + time.sleep(random.random()) + return soup + + def dict_partitioner(data: dict, level: int): total_n = len(data) partition_n = total_n // level @@ -12,7 +100,12 @@ def dict_partitioner(data: dict, level: int): start = end -def write_local_as_json(data, file_path, file_name): +def write_local_as_json(data: dict, file_path: str, file_name: str): + """ + data : dictionary with the dataclass value + file_path : directory string where the json file created + file_name : file name without extension + """ from dataclasses import asdict import json import os @@ -71,4 +164,13 @@ def wrapped_f(*args, **kwargs): return wrapped_f - return wrap \ No newline at end of file + return wrap + + +def current_datetime_getter(): + import pytz + from datetime import datetime + kst = pytz.timezone('Asia/Seoul') + current_time = datetime.now(kst) + current_datetime = current_time.strftime("%Y%m%d_%H%M%S") + return current_datetime \ No newline at end of file diff --git a/brickstudy_ingestion/tests/scrapper/test_instagram.py b/brickstudy_ingestion/tests/scrapper/test_instagram.py new file mode 100644 index 0000000..a1fee10 --- /dev/null +++ b/brickstudy_ingestion/tests/scrapper/test_instagram.py @@ -0,0 +1,13 @@ +from src.scrapper.ins_url import InsURLCrawler +from src.scrapper.ins_data import InsDataCrawler + + +def test_get_urls(): + keyword = '올리브영' + url_crawler = InsURLCrawler() + url_crawler.get_urls(keyword) + url_crawler.materialize() + + crawler = InsDataCrawler(url_crawler.data) + crawler.get_post_data() + crawler.materialize() \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 0912597..8b4abb5 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -69,6 +69,7 @@ x-airflow-common: # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server # yamllint enable rule:line-length AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true' # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow[postgres,virtualenv,apache-airflow-providers-mysql]} @@ -79,7 +80,8 @@ x-airflow-common: - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins - user: "${AIRFLOW_UID:-50000}:0" + # user: "${AIRFLOW_UID:-50000}:0" + user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}" depends_on: &airflow-common-depends-on redis: @@ -343,6 +345,20 @@ services: airflow-init: condition: service_completed_successfully + # Proxy container for docker socket + # Forward DockerOperator TCP connection to Host docker daemon + docker-socket-proxy: + image: tecnativa/docker-socket-proxy:0.1.1 + environment: + CONTAINERS: 1 + IMAGES: 1 + AUTH: 1 + POST: 1 + privileged: true + volumes: + - /var/run/docker.sock:/var/run/docker.sock:ro # read only + restart: always + # ======================================== # Kafka infra zookeeper: