ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Python 크롤링 공부
    Python 2021. 11. 8. 02:14

     

    1.

     

    import urllib.request
    from bs4 import BeautifulSoup
    from itertools import count 
    import pandas as pd
    import ssl
    import datetime
    def get_request_url(url, enc='utf-8'):
        context = ssl._create_unverified_context()
        req = urllib.request.Request(url)
        try:
            respose = urllib.request.urlopen(req, context=context)
            if respose.getcode() == 200:
                print("get_request_url : [%s] Url Request Success"% datetime.datetime.now())
                try:
                    rcv = respose.read()
                    ret = rcv.decode(enc)
                except UnicodeDecodeError:
                    ret = rcv.decode(enc, 'replace')    # 모든 사이트가 유니코드를 쓰진 않는다.
                    print("[Error!] get_request_url : [%s] Http Unicode Decode Error "% datetime.datetime.now())
                return ret
        except Exception as e:
            print(e)
            print("[%s] Error for URL " % datetime.datetime.now())
    def getPelicanaAddress(sido):
        c = 0
        bEnd = True                     # flag
        result = []
        for page in count():            # count sido pages  
            context = ssl._create_unverified_context()
            url = "https://pelicana.co.kr/store/stroe_search.html?branch_name=&gu=&si=%s&page=%s" \
            %(urllib.parse.quote(sido), str(page+1))
    
            rcv_data = get_request_url(url)         # get <http.client.HTTPResponse object>
            soupData = BeautifulSoup(rcv_data, "html.parser")
            for j in range(1,11):       # list items (10)
                bEnd = False
                c += 1
                print(c)      
                try:
                    items = soupData.find('table', {'class' : 'table mt20'}).find_all('tr')[j].text.strip().split('\n')
                    address = items[1]
                    result.append([items[0], address.split()[0], address.split()[1], address, items[-2].strip()])
                except IndexError as e:
                    bEnd = True
                except Exception as e:
                    print('[Error!] getPelicanaAddress : ', e)
            if(bEnd == True):
                return result
    
    
    if __name__ == '__main__':
        sido_list = ['서울특별시','부산광역시','대구광역시','제주특별자치도','광주광역시','울산광역시','인천광역시','세종특별자치시','경기도','강원도','경상북도','경상남도','충청북도','충청남도','전라북도','전라남도','대전광역시']
        result = []
    
        print("페리카나 주소 크롤링 시작")
        for sido in sido_list:
            getPelicanaAddress(sido)
        print(result)
        print(len(result))
        perincana_table = pd.DataFrame(result, columns=('store', 'sido', 'gungu', 'address', 'phone'))
        perincana_table.head()
        perincana_table.to_csv('python_study/data/perlicana2.csv', encoding='utf-8',index =True)                
        print("페리카나 주소 크롤링 종료")

     

     

    2.

     

    from itertools import count
    import pandas as pd
    from selenium  import webdriver
    from selenium.webdriver.chrome.webdriver import WebDriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.common.exceptions import TimeoutException
    from bs4 import BeautifulSoup
    
    ## python Selenium은 Sync Syntax로 동작하지만 
    # AJAX등으로 구현된 Dynamic DOM에 대한 처리는 별도로 해줘야한다.
    
    def getStoreList(driver : WebDriver, url : str):        
        driver.get(url)
        for page in count():
            try:
                driver.execute_script(f"store.getList({str(page+1)})")
                webels = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, 'lows')),message="Time Out")
                print(webels)
            except TimeoutException as tout:
                beEnd = True
                print('Page Error', tout)
                return
            rcv_page_data = driver.page_source
            soupData = BeautifulSoup(rcv_page_data, "html.parser")
            #[<tbody>] -> <tbody>
            beEnd = True
            for store_list in soupData.findAll('tbody', {'id':'store_list'}): 
                # get <tr> lsit
                for store_tr in store_list:  
                    try:                
                        beEnd=False
                        datas = list(store_tr.strings)
                        items = list(filter(None,[i.strip() for i in datas]))
                        store_address = items[2]
                        result.append([items[0], store_address.split()[0], store_address.split()[1], items[1], items[2]])
                    except:
                        beEnd = True
                        pass
                if beEnd:
                    break
    
    result = []
    url = "http://www.goobne.co.kr/store/search_store.jsp"
    driver = webdriver.Chrome("python_study/driver/chromedriver.exe")
    getStoreList(driver, url)
    print(result)
    driver.close()
    driver.quit()
    gubne_table = pd.DataFrame(result, columns=('store', 'sido', 'gungu', 'address', 'phone'))
    gubne_table.to_csv("python_study/data/gubne_store.csv")

     

     

    3.

    from itertools import count
    import time
    import pandas as pd
    from selenium import webdriver
    from selenium.common.exceptions import ElementNotInteractableException, StaleElementReferenceException, TimeoutException
    from selenium.webdriver.support.wait import WebDriverWait
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support import expected_conditions as EC
    
    # BS 사용안하고 element select 
    
    result = []
    driver = webdriver.Chrome("python_study/driver/chromedriver.exe")
    url = 'http://tour.interpark.com/'
    driver.get(url)
    try:
        input_search = driver.find_element_by_id('SearchGNBText')
        input_search.send_keys('프랑스')
        btn_search = driver.find_element_by_css_selector('button.search-btn')
        btn_search.click()
        WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'li.moreBtnWrap > button')), message="Time Out")
        driver.find_element_by_css_selector('.oTravelBox>.boxList>.moreBtnWrap>.moreBtn').click()
    except TimeoutException as tout:
        print(tout)
    except ElementNotInteractableException as e:
        print(e)
        driver.find_element_by_css_selector('.oTravelBox>.boxList>.moreBtnWrap>.moreBtn').click()
        
    
    for page in count():
        beEnd = True
        try:
            beEnd = False
            driver.execute_script(f'searchModule.SetCategoryList({page+1}, '')')
            time.sleep(5)
            # querySelectorAll
            li_els = driver.find_elements_by_css_selector('div.panelZone > div.oTravelBox > ul.boxList > li.boxItem')
    
            beEnd=True
            for element in li_els:
                beEnd=False
                result.append([
                    element.find_element_by_css_selector('h5.proTit').text,
                    element.find_element_by_css_selector('strong.proPrice').text.replace('~', ''),
                    element.find_elements_by_css_selector('.info-row .proInfo')[1].text.split(':')[1].strip()
                ])
        except TimeoutException as e:
            print(e)
            beEne = True
        except StaleElementReferenceException as e:
            print(e)
            continue
        except IndexError as e:
            print(e)
            continue
        if beEnd:
            break
    
    df = pd.DataFrame(result, columns=('title', 'price', 'info'))
    df.to_csv('python_study/data/tour.csv')
    print(result)
    driver.close()
    driver.quit()

     

     

    4.

    from bs4 import BeautifulSoup
    
    with open('python_study/data/book.xml', 'r') as f:
        book_xml = f.read()
        print(book_xml)
    
    soupData = BeautifulSoup(book_xml, 'xml')
    books = soupData.find_all('author')
    print(books[0].text, books[1].get_text())
    
    ...
    
    ### 
    # xml 생성하기
    # xml 이래서 안쓰나봄
    ###
    from xml.etree.ElementTree import Element,  SubElement, ElementTree
    
    filename = 'xmlSample'
    root = Element('books')
    firsub = SubElement(root, 'book')
    secondSub = SubElement(root, 'pubinfo')
    secondsub = SubElement(firsub, 'author').text = '아무개'
    secondsub = SubElement(firsub, 'price').text = '30000'
    secondsub = SubElement(firsub, 'pubdate').text = '01/01/2020'
    
    thirdsub = SubElement(secondSub, 'author').text = '아무개'
    thirdsub = SubElement(secondSub, 'price').text = '30000'
    thirdsub = SubElement(secondSub, 'pubdate').text = '01/01/2020'
    
    tree = ElementTree(root)
    tree.write('python_study/data/'+filename+'.xml',encoding='utf-8')
    
    text = '''
    <books>
        <book>
            <author>아무개</author>
            <price>30000</price>
            <pubdate>01/01/2020</pubdate>
        </book>
        <pubinfo>
            <author>아무개</author>
            <price>30000</price>
            <pubdate>01/01/2020</pubdate>
        </pubinfo>
    </books>
    '''
    with open('python_study/data/test.xml', 'wt', encoding='utf-8') as f:
        f.writelines(text)
        f.close()
    
    
    #### Json 읽기
    import json
    with open('python_study/data/test.json', 'rt') as f:
        content = f.read()
        json_data = json.loads(content)
        print(json_data)
        #print(json_data.author)
        print(json_data['packages'])
    
    
    #### Json 쓰기
    dic_data = {'key':'value', 'number':0, 'list':[1,2,3], 'obj':{'v1':'vv'}}
    with open('python_study/data/saved.json', 'w') as f:
        json.dump(dic_data, f)
    
    
    
    import urllib.request 
    
    res = urllib.request.urlopen('http://time.jsontest.com')
    data = json.loads(res.read())
    print(data, data['time'])

     

     

    5.

    import urllib.request
    import ssl 
    
    from bs4 import BeautifulSoup
    
    import datetime
    import math
    import pandas as pd
    
    context = ssl._create_unverified_context()
    
    def get_request_url(url):
        request = urllib.request.Request(url)
        try:
            response = urllib.request.urlopen(request)
            if response.getcode() == 200:
                print("[%s] Url Request Success" % datetime.datetime.now() )
                return response.read().decode("CP949")
        except Exception as e:
            print(e)
            print("[%s] Error for URL : %s" %(datetime.datetime.now(), url))
            return None
    
    
    if __name__ == "__main__":
        
        curr_page = 1   # 현재 페이지 
        views = 20      # 페이지당 뷰 수
        last_num = 0    # 최종페이지 계산을 위한 값
        last_page = 0   # 최종페이지
    
        eq_list = []
    
        stDate = "2017-03-24"
        edDate = "2020-03-24"
    
        while(1):
            endpoint = "http://www.weather.go.kr/weather/earthquake_volcano/domesticlist.jsp?"
            params = "startSize=2&endSize=999&x=0&y=0&schOption=T"
            params += "&startTm=" + stDate
            params += "&endTm=" + edDate
            params += "&pNo="  + str(curr_page)
    
            url = endpoint + params
    
            response = get_request_url(url)
            soupdata = BeautifulSoup(response, "html.parser")
            tableData = soupdata.find("table", {"id":"excel_body"})
            tableBody = tableData.find("tbody")        
            tr_list = tableBody.find_all("tr")
            
            # 버튼 영역(tr) 제거 (page navigation이 table row에 포함된 디자인)
            del tr_list[-1]
    
            # 마지막 페이지 계산
            if(last_num == 0):
                #print(last_num, views)
                last_num = int(tr_list[0].find('td').text)
                last_page = math.ceil(last_num/views)    # 페이징 계산 함수 
    
            for tr in tr_list:
                tr = list(tr.strings)
                print(tr)
    
                # 발생시각 / 규모 / 깊이(km) / 최대진도 / 위도 / 경도 / 위치 
                eq_info = {}
                eq_info['timestamp']    = tr[1]
                eq_info['magnitude']    = tr[2]
                eq_info['depth']        = tr[3]
                eq_info['MMI_scale']    = tr[4]
                eq_info['lat']          = tr[5]
                eq_info['lon']          = tr[6]
                eq_info['location']     = tr[7]
    
                eq_list.append(eq_info)          
    
            result = pd.DataFrame(eq_list)
            print(result)
            # index 부여
            result = result[['timestamp', 'magnitude', 'depth', 'MMI_scale', 'lat', 'lon','location']]
            print(result)
            # 마지막 페이지이면 크롤링 정지 // 마지막 페이지가 아니면 페이지수 증가후 크롤링
            if(last_page > curr_page):
                curr_page = curr_page + 1
            else:
                break
                   
        result.to_csv("python_study/data/earthquake_list_%s-%s.csv"%(stDate[0:4], edDate[0:4]), encoding="utf-8")
        print("Crawling Complete!!")

     

    6.

     

    # http://openapi.tour.go.kr/openapi/service/TourismResourceStatsService/getPchrgTrrsrtVisitorList?&_type=json&pageNo=8&numOfRows=20&serviceKey={serviceKey}&YM=201201&GUNGU=&SIDO=&RES_NM=
    
    import json
    import math
    import sys
    sys.path.append('python_study')
    import numpy as np
    from http_request import get_request_url    # my module
    import urllib.request
    import pandas as pd
    
    
    service_key = None
    with open('python_study/config/setting.json', 'rt') as f:
        service_key = json.load(f)['key']
    
    def getTourPointVisitor(yyyymm, sido, gungu, nPageNum, nItems):
        end_point="http://openapi.tour.go.kr/openapi/service/TourismResourceStatsService/getPchrgTrrsrtVisitorList"
        parameters="?_type=json&serviceKey="+service_key
        parameters+="&pageNo="+str(nPageNum)
        parameters+="&numOfRows="+str(nItems)
        parameters+="&YM="+yyyymm
        parameters+="&GUNGU="+ urllib.parse.quote(gungu)
        parameters+="&SIDO="+ urllib.parse.quote(sido)
        parameters+="&RES_NM="
        url = end_point+parameters
        print(url)
        retData = get_request_url(url)
        if(retData==None):
            return None
        else:
            # json.load -> dict / json.loads -> object(str or byte array)
            return json.load(retData)
            
         
    tourList = pd.DataFrame()
    for year in range(2011,2017):
        rowCount = 500
        for month in range(1,13):
            yyyymm = "{0}{1:0>2}".format(str(year),str(month))
            nPageNum = 1        
            while True:  # 페이지 반복
                jsonData = getTourPointVisitor(yyyymm,'','',nPageNum,rowCount)
                if (jsonData['response']['header']['resultMsg'] == 'OK'):
                    nTotal= jsonData['response']['body']['totalCount']
                    print('total length : ', nTotal)
                    if nTotal== 0: break
                    for item in map(lambda dict:dict, jsonData['response']['body']['items']['item']):
                        tourList = tourList.append(item, ignore_index=True)
                    print(tourList)
                    nPage = math.ceil(nTotal/rowCount)
                    nPageNum+=1
                    if(nPageNum == nPage): break
                else:
                    break
    
    tourList = tourList[['ym', 'addrCd', 'gungu', 'sido', 'resNm', 'rnum', 'csForCnt', 'csNatCnt']]
    tourList.rename(columns={'ym':'yyyymm'})
    tourList.fillna(0)
    print(tourList)
    tourList.to_csv('python_study/data/서울특별시_관광지입장정보_2011_2016.csv', encoding='utf-8')
        
    print('%s_관광지입장정보_%d_%d.json' %("서울특별시", 2011,2016))

     

     

     

    7.

    import json
    import pandas as pd
    import sys
    sys.path.append('python_study')
    from http_request import get_request_url    # my module
    
    
    service_key = None
    with open('python_study/config/setting.json', 'rt') as f:
        service_key = json.load(f)['portKey']
    
    print(service_key)
    
    
    nat_cd = ''
    ed_cd = 'E'
    totalCount = '300'
    
    result=[]
    for year in range(2017,2019):
        for month in range(1,13):
            yyyymm = '{0}{1:0>2}'.format(str(year), str(month))
            url = 'http://openapi.tour.go.kr/openapi/service/EdrcntTourismStatsService/getForeignTuristStatsList?'
            params = f'ServiceKey={service_key}&_type=json'
            params += f'&YM={yyyymm}'
            params += f'&NAT_CD={nat_cd}'
            params += f'&ED_CD={ed_cd}'
            params += f'&numOfRaws={totalCount}'
    
            url = url+params
            print(url)
    
            jsonResult=[]
            natinal_code = "112" #중국
            ed_cd ="E"
            res_msg = get_request_url(url)
            if res_msg is not None:
                jsonData = json.load(res_msg)
    
                if jsonData['response']['header']['resultMsg'] == 'OK':
                    totalCount = jsonData['response']['body']['totalCount']
                    if totalCount == 1:
                        natKorNm = jsonData['response']['body']['items']['item']['natKorNm']
                        natCd=jsonData['response']['body']['items']['item']['natCd']
                        num= json['resonse']['body']['items']['item']['num']
                        result.append([natKorNm, natCd, yyyymm, num])
                    elif totalCount > 1:
                        for item in jsonData['response']['body']['items']['item']:
                            natKorNm = item['natKorNm']
                            natCd = item['natCd']
                            num = item['num']
                            result.append([natKorNm, natCd, yyyymm, num])
    
    
    df = pd.DataFrame(result, columns=['국가명', '국가코드', '입국연도', '입국자수'])
    df.to_csv('python_study/data/해외방문객정보.%s.csv'%(yyyymm))

     

     

    8.

    import pymysql
    import json
     
    
    password = ""
    with open('config/setting.json', 'rt') as f:
        password = json.load(f)['DBPassword']
    
    
    with pymysql.connect(host='220.123.224.95', user='root', password='1234', db='python', charset='utf8') as conn:
        cursor = conn.cursor()
        # query = "SELECT * FROM usertable LIMIT 5"
        query =  "INSERT INTO usertable VALUES ('%s','%s', '%s')"
    
        # while(True):
        #     row = cursor.execute('')
        #     if row == None: 
        #         break
        #     name = row[0]
        #     email = row[1]
        #     birthday = row[2]
        #     print('%3s %10s %5s' % (name, email,birthday))
            
        while(True):
            row = cursor.execute('')
            name = input('이름 입력 :')
            email = input('이메일 입력 :')
            birthyear = input('태어난 연도 입력 :')
            if name == 'close':
                break
            cursor.execute(query, (name, email, birthyear))
    
        conn.commtit()

     

     

    9.

    module/pelicana.py

    import urllib.request
    from bs4 import BeautifulSoup
    from itertools import count 
    import pandas as pd
    import ssl
    import datetime
    def get_request_url(url, enc='utf-8'):
        context = ssl._create_unverified_context()
        req = urllib.request.Request(url)
        try:
            respose = urllib.request.urlopen(req, context=context)
            if respose.getcode() == 200:
                print("get_request_url : [%s] Url Request Success"% datetime.datetime.now())
                try:
                    rcv = respose.read()
                    ret = rcv.decode(enc)
                except UnicodeDecodeError:
                    ret = rcv.decode(enc, 'replace')    # 모든 사이트가 유니코드를 쓰진 않는다.
                    print("[Error!] get_request_url : [%s] Http Unicode Decode Error "% datetime.datetime.now())
                return ret
        except Exception as e:
            print(e)
            print("[%s] Error for URL " % datetime.datetime.now())
    def getPelicanaAddress(sido):
        c = 0
        bEnd = True                     # flag
        result = []
        for page in count():            # count sido pages  
            context = ssl._create_unverified_context()
            url = "https://pelicana.co.kr/store/stroe_search.html?branch_name=&gu=&si=%s&page=%s" \
            %(urllib.parse.quote(sido), str(page+1))
    
            rcv_data = get_request_url(url)         # get <http.client.HTTPResponse object>
            soupData = BeautifulSoup(rcv_data, "html.parser")
            for j in range(1,11):       # list items (10)
                bEnd = False
                c += 1
                print(c)      
                try:
                    items = soupData.find('table', {'class' : 'table mt20'}).find_all('tr')[j].text.strip().split('\n')
                    address = items[1]
                    result.append([items[0], address.split()[0], address.split()[1], address, items[-2].strip()])
                except IndexError as e:
                    bEnd = True
                except Exception as e:
                    print('[Error!] getPelicanaAddress : ', e)
            if(bEnd == True):
                return result
    
    
    if __name__ == '__main__':
        sido_list = ['서울특별시','부산광역시','대구광역시','제주특별자치도','광주광역시','울산광역시','인천광역시','세종특별자치시','경기도','강원도','경상북도','경상남도','충청북도','충청남도','전라북도','전라남도','대전광역시']
        result = []
    
        print("페리카나 주소 크롤링 시작")
        for sido in sido_list:
            getPelicanaAddress(sido)
        print(result)
        print(len(result))
        perincana_table = pd.DataFrame(result, columns=('store', 'sido', 'gungu', 'address', 'phone'))
        perincana_table.head()
        perincana_table.to_csv('python_study/data/perlicana2.csv', encoding='utf-8',index =True)                
        print("페리카나 주소 크롤링 종료")

     

     

    import pymysql
    from python_study.module.pelicana import getPelicanaAddress
    
    ## 오라클은 사용자 계정자체가 하나의 DB로 존재한다.
    
    if __name__ == '__main__':
        sido_list = ['서울특별시','부산광역시','대구광역시','제주특별자치도','광주광역시','울산광역시','인천광역시','세종특별자치시','경기도','강원도','경상북도','경상남도','충청북도','충청남도','전라북도','전라남도','대전광역시']
        result = []
    
        print("페리카나 주소 크롤링 시작")
        for sido in sido_list:
            result += getPelicanaAddress(sido)
    
        conn = pymysql.connect(host='220.123.224.95', user='root', password='1234', db='pythondb', charset='utf8')
        cursor = conn.cursor()
        # query = "SELECT * FROM usertable LIMIT 5"
        query =  """INSERT INTO pelicana_pyj(store, sido, gungu, address, phone) 
                        VALUES (%s, %s, %s, %s, %s) """
        
        
        for item in result:    
            cursor.execute(query, (item[0], item[1], item[2],item[3],item[4]))
    
        conn.commit()
        conn.close()

     

     

    10.

     

    module/PelicanaDB.py

    from typing import overload
    import pymysql
    
    class PelicanaDB():
        def __init__(self,) -> None:
            self.db_init()
    
        def db_init(self):
            self.conn = pymysql.connect(host='220.123.224.95', user='root', password='1234', db='pythondb', charset='utf8')
    
        def db_free(self):
            if self.conn :
                self.conn.close()
    
        def pelicana_insert(self, store, sido, gungu, address, phone):
            query =  f"""INSERT INTO pelicana_pyj(store, sido, gungu, address, phone) 
                        VALUES (%s, %s, %s, %s, %s) """ 
            with self.conn.cursor() as cur:
                cur.execute(query, (store, sido, gungu, address, phone))        
                print('Last input Data ID :', cur.lastrowid)
            
            self.conn.commit()
    
        def pelicana_lastSelect(self):
            query = f'select * from pelicana_pyj ORDER BY id DESC LIMIT 1'
            with self.conn.cursor() as cur:
                cur.execute(query)
                row = cur.fetchone()
                print(row)
                if row:
                    return row
               
    
    
    if __name__ == '__main__':
        db = PelicanaDB()
        db.pelicana_insert('토성', '갤럭시', '은하구','안드로메다1234', '102-34-41')

     

     

    from python_study.module.PelicanaDB import PelicanaDB
    import sys
    sys.path.append('python_study')
    from python_study.module.pelicana import getPelicanaAddress
    
    if __name__ == '__main__':
        sido_list = ['서울특별시','부산광역시','대구광역시','제주특별자치도','광주광역시','울산광역시','인천광역시','세종특별자치시','경기도','강원도','경상북도','경상남도','충청북도','충청남도','전라북도','전라남도','대전광역시']
        result = []
    
        print("페리카나 주소 크롤링 시작")
        for sido in sido_list:
            result += getPelicanaAddress(sido)
    
        
        db = PelicanaDB()
        
        for item in result:    
            db.pelicana_insert(item[0], item[1], item[2],item[3],item[4])
    
        print('페리카나 주소 디비 저장 완료')
        db.db_free()
        print('크롤링 완료')

     

     

     

    11.

    module/http_request.py

    def get_request_url(url):
        import urllib.request
        import datetime
        request = urllib.request.Request(url)
        response = None
        try:
            response = urllib.request.urlopen(request)
            if response.getcode() == 200:
                print("[%s] Url Request Success" % datetime.datetime.now() )
                return response
        except Exception as e:
            print(e)
            print('res : ', response)
            print("[%s] Error for URL : %s" %(datetime.datetime.now(), url))
            return None

     

    module/TourismDB.py

    import pymysql
    
    class TourismDB():
        def __init__(self) -> None:
            self.db_init()
    
        def db_init(self):
            self.conn = pymysql.connect(host='220.123.224.95', user='root', password='1234', db='pythondb', charset='utf8')
    
        def db_free(self):
            if self.conn :
                self.conn.close()
    
        def tourism_insert(self, natKorNm, natCd, yyyymm, num):
            query =  """INSERT INTO tourism_pyj(natKorNm, natCd, yyyymm, num) 
                        VALUES (%s, %s, %s, %s) """ 
            with self.conn.cursor() as cur:
                cur.execute(query, (natKorNm, natCd, yyyymm, num))        
                print('Last input Data ID :', cur.lastrowid)
            
            self.conn.commit()
    
        def tourism_lastSelectPrint(self):
            query = 'select * from tourism_pyj ORDER BY id DESC LIMIT 1'
            with self.conn.cursor() as cur:
                cur.execute(query)
                row = cur.fetchone()
                print(row)
                if row:
                    return row
    
    if __name__ == '__main__':
        db = TourismDB()
        db.tourism_insert('태양나라', '999', '999902', 1000)
        db.tourism_lastSelectPrint()
        db.db_free()

     

    import json
    import sys
    sys.path.append('python_study')
    sys.path.append('module')
    from python_study.module.http_request import get_request_url    # my module
    from python_study.module.TourismDB import TourismDB
    
    service_key = None
    with open('python_study/config/setting.json', 'rt') as f:
        service_key = json.load(f)['portKey']
    
    print(service_key)
    
    
    nat_cd = ''
    ed_cd = 'E'
    totalCount = '300'
    
    result=[]
    for year in range(2017,2019):
        for month in range(1,13):
            yyyymm = '{0}{1:0>2}'.format(str(year), str(month))
            url = 'http://openapi.tour.go.kr/openapi/service/EdrcntTourismStatsService/getForeignTuristStatsList?'
            params = f'ServiceKey={service_key}&_type=json'
            params += f'&YM={yyyymm}'
            params += f'&NAT_CD={nat_cd}'
            params += f'&ED_CD={ed_cd}'
            params += f'&numOfRaws={totalCount}'
    
            url = url+params
            print(url)
    
            jsonResult=[]
            natinal_code = "112" #중국
            ed_cd ="E"
            res_msg = get_request_url(url)
            if res_msg is not None:
                jsonData = json.load(res_msg)
    
                if jsonData['response']['header']['resultMsg'] == 'OK':
                    totalCount = jsonData['response']['body']['totalCount']
                    if totalCount == 1:
                        natKorNm = jsonData['response']['body']['items']['item']['natKorNm']
                        natCd=jsonData['response']['body']['items']['item']['natCd']
                        num= json['resonse']['body']['items']['item']['num']
                        result.append([natKorNm, natCd, yyyymm, num])
                    elif totalCount > 1:
                        for item in jsonData['response']['body']['items']['item']:
                            natKorNm = item['natKorNm']
                            natCd = item['natCd']
                            num = item['num']
                            result.append([natKorNm, natCd, yyyymm, num])
    
    
    # df = pd.DataFrame(result, columns=['국가명', '국가코드', '입국연도', '입국자수'])
    # df.to_csv('python_study/data/해외방문객정보.%s.csv'%(yyyymm))
    
    
    db = TourismDB()
    for row in result:
        db.tourism_insert(row[0], row[1], row[1], row[2])
    db.db_free()

     

     

    12.

    module/PelicanaDBOracle.py

    import cx_Oracle
    
    class PelicanaDBOracle():
        def __init__(self) -> None:
            self.db_init()
    
        def db_init(self):
            self.conn = cx_Oracle.connect('findaw/1234@localhost:1521/xe')
    
        def db_free(self):
            if self.conn :
                self.conn.close()
    
        def pelicana_insert(self, store, sido, gungu, address, phone):
            sql = '''insert into pelicana_crawling                    
                    values(ID2_SEQ.nextval,:1,:2, :3, :4,:5, CURRENT_TIMESTAMP) '''
            with self.conn.cursor() as cur:
                cur.execute(sql, (store, sido, gungu, address, phone))        
                print('Last input Data ID :', cur.lastrowid)
            
            self.conn.commit()
            
        def pelicana_lastSelect(self):
            sql = f'SELECT * FROM pelicana_crawling WHERE ROWNUM = 1 ORDER BY id DESC;'
            with self.conn.cursor() as cur:
                cur.execute(sql)
                row = cur.fetchone()
                print(row)
                if row:
                    return row
    
    
    if __name__ == '__main__':
        db = PelicanaDBOracle()
        db.pelicana_insert('토성', '갤럭시', '은하구','안드로메다1234', '102-34-41')

     

    from python_study.module.PelicanaDBOracle import PelicanaDBOracle
    from python_study.module.pelicana import getPelicanaAddress
    
    if __name__ == '__main__':
        sido_list = ['서울특별시','부산광역시','대구광역시','제주특별자치도','광주광역시','울산광역시','인천광역시','세종특별자치시','경기도','강원도','경상북도','경상남도','충청북도','충청남도','전라북도','전라남도','대전광역시']
        result = []
    
        print("페리카나 주소 크롤링 시작")
        for sido in sido_list:
            result += getPelicanaAddress(sido)
    
        
        db = PelicanaDBOracle()
        
        for item in result:    
            db.pelicana_insert(item[0], item[1], item[2],item[3],item[4])
    
        print('페리카나 주소 디비 저장 완료')
        db.db_free()
        print('크롤링 완료')
        
    (8) < (2,4)
Designed by Tistory.