새 오브젝트가 작성 시간에 따라 키가있는 임의의 간격으로 추가되는 버킷이 있습니다. 예를 들어새 파일이 S3에 도착하면 luigi 태스크를 실행하십시오.
's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time)
사실, 이러한 Scrapy의 출력 크롤링한다. 이러한 크롤링을 마스터 .csv 제품 카탈로그 파일 (내가 "product_catalog.csv"라고 부름)과 일치시키는 작업을 트리거하고 싶습니다. 또한 정기적으로 업데이트됩니다.
지금 당장이 프로세스를 실행할 때마다 채울 전역 변수로 작성한 여러 개의 Python 스크립트가 있습니다. 이들은 가져온 속성이되어야합니다.
1) 새로운 CSV 파일에 표시 "S3 : // 나의 버킷/대용량/..."고유 키 이름으로 시간 크롤링에 따라 그래서 여기
이 일어날 필요가 무엇인가 완료되었습니다. 루이지는 이것을보고 시작합니다.
2) 새 파일에서 "cleaning.py"가 luigi에 의해 실행되므로 런타임에 "cleaning.py"매개 변수 (S3에 표시된 파일)를 제공해야합니다. 결과는 다음 단계로 전달되는 것 외에도 S3에 저장됩니다.
3) "product_catalog.csv"의 최신 버전은 데이터베이스에서 가져온 나는이 작업이 완료 이해가되지 않을 수 있습니다 실현 "matching.py"
에서 "cleaning.py"의 결과를 사용합니다. 모든 내용을 명확히하기 위해 필 요한 편집 내용을 제공 할 것입니다. 초기 응답에 따라
편집
, 나는 길을 따라 단계를 저장하는 풀 동작을 수 있도록이 구성했습니다. 하지만 지금 나는 꽤 길을 잃었다. 이것은 파이썬 프로젝트를 함께 묶은 첫 번째 시간이므로, 내가 이것을 수행함에 따라 배우는 .py 등이 있습니다. 늘 그렇듯이, 다음 번 장애물에서의 혼란에 이은 성공으로 인한 흥분의 울퉁불퉁 한 길입니다.
내 질문은 다음과 같습니다.
1) Scrapy에서 거미를 가져 오는 방법은 나에게 불확실합니다. 나는 그들 중 12 개 정도를 가지고 있으며 목표는 루이지가 그들 모두를 위해 크롤링> 클린> 일치 프로세스를 관리하도록하는 것입니다. Scrapy 설명서에 다음 내용이 포함되어 있습니다.
class MySpider(scrapy.Spider):
# Your spider definition
그 의미는 무엇입니까? 스파이더를 제어하는 스크립트에 스파이더를 다시 작성 하시겠습니까? 그것은 의미가 없으며 그들의 예는 도움이되지 않습니다.
2) 나는 S3로 내보내기 위해 파이프 라인을 구성했지만 luigi는 output()에서도 이것을 수행하는 것으로 보인다. 어느 것을 사용해야하며 어떻게 함께 연주해야합니까?
3) 루이지는 CrawlTask ()가 성공적으로 실행되었지만 몇 초 내에 완료되고 크롤링이 일반적으로 몇 분이 걸리기 때문에 잘못되었다고 말합니다. 성공에 해당하는 출력 파일도 없습니다.
4) S3 용 자격 증명은 어디에서 제공합니까?
여기 내 코드입니다. 나는 내가 더 나은 것으로 인식하는 것 대신에 작동하지 않는 것을 주석 처리했다. 그러나 내 감각은 내가하고 싶은 일에 대한 웅장한 아키텍처가 있다는 것입니다. 나는 아직 이해하지 못하고 있습니다.
import luigi
from luigi.s3 import S3Target, S3Client
import my_matching
from datetime import datetime
import os
import scrapy
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_crawlers.my_crawlers.spiders import my_spider
class CrawlTask(luigi.Task):
crawltime = datetime.now()
spider = luigi.Parameter()
#vertical = luigi.Parameter()
def requires(self):
pass
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime))
def run(self):
os.system("scrapy crawl %s" % self.spider)
#process = CrawlerProcess(get_project_settings())
#process.crawl("%s" % self.spider)
#process.start()
class FetchPC(luigi.Task):
vertical = luigi.Parameter()
def output(self):
if self.vertical == "product1":
return "actual_data_staging/product1_catalog.csv"
elif self.vertical == "product2":
return "actual_data_staging/product2_catalog.csv"
class MatchTask(luigi.Task):
crawltime = CrawlTask.crawltime
vertical = luigi.Parameter()
spider = luigi.Parameter()
def requires(self):
return CrawlTask(spider=self.spider)
return FetchPC(vertical=self.vertical)
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime))
def run(self):
if self.vertical == 'product1':
switch_board(requires.CrawlTask(), requires.FetchPC())
MatchTask 나는 그게 내 제품 카탈로그에 긁힌 제품을 비교 쓴 파이썬 스크립트를 말합니다. 다음과 같이 보입니다.
def create_search(value):
...
def clean_column(column):
...
def color_false_positive():
...
def switch_board(scrape, product_catalog):
# this function coordinates the whole script
이 큰 시스템에는 무엇을 권하고 싶습니까? 모든 것은 파이썬으로 작성되었으며 Docker 컨테이너에서 물건을 포장하고 있습니다. – thaneofcawdor
Luigi 작업을 사용할 수 없습니까? 데이터 파이프 라인의 각 단계를 작업으로 구현하고 원하는대로 작업을 시작할 수 있습니다. 나는 Luigi를 사용한 적이 한번도 없었습니다. 저는이 물건에 대해 http://www.celeryproject.org/ celery를 사용하지만, 작업을 정의하고 구현하기위한 시스템이기도합니다. docker 사용에 관해서는 docker로이 모든 작업을 수행 할 수 있어야하지만 docker 네트워크를 사용해야하고 컨테이너를 올바르게 구성해야합니다. 컨테이너는 즉시 IP가 할당되므로 IP를 찾기 위해 '서비스 검색'을 할 수있는 방법이 필요합니다. 부두 노동자가 그렇게 할 수있는 도구를 만들 것이라고 확신합니다. –
나는 치료가 루이지에 신호를 보내면 치료가 생성 한 s3 키를 포함하는 Task.requires 객체로 보낼 수 있을까? 그런 다음 luigi는 s3에서 해당 키를 검색 할 수 있습니다. 이것은 바보 같지만 파이썬 코드 조각 (크롤러와 루이지)을 서로 이야기하고 그 정보를 전달하는 방법은 무엇입니까? – thaneofcawdor