본문 바로가기

주식 공부

[주식거래자동화] 07. 일별거래데이터 DB 저장

반응형

KOSPI 200 종목에 대해 KOAStudio 에서 조회한 정보를 바탕으로 DB를 생성하여 종목코드별 일봉차트를 저장한다.

KOSPI 200 종목코드 받기 및 일별 데이터 조회 방법은 이전 글에 기록해 두었다.

2020/12/09 - [주식 공부] - [주식거래자동화] 06. KOSPI 종목코드 및 일별 데이터 조회

1. DB Table 생성

KOAStudio로 주식일봉차트조회요청 TR 조회 시 1일 15개의 데이터를 주는 것을 확인했다. 이 정보를 바탕으로 DB Table 생성하여 전체 일자별 데이터를 저장하려 한다.

우선 조회되는 15개 항목 중 저장할 항목들은 아래 8개의 데이터이다.

["종목코드" ,"현재가" ,"거래량" ,"거래대금" ,"일자" ,"시가" ,"고가" ,"저가"]

해당 정보를 저장하기 위해 PostgreSQL 에 Table 을 생성한다.

CREATE TABLE TBL_DAILY_STOCK (
	STCODE         CHAR(8) NOT NULL,  -- 종목코드
	DATE           DATE NOT NULL,     -- 일자
	PRICE_CLOSE    INT NOT NULL,      -- 현재가
	PRICE_OPEN     INT NOT NULL,      -- 시가
	PRICE_HIGH     INT NOT NULL,      -- 고가
	PRICE_LOW      INT NOT NULL,      -- 저가
	TRADING_VOLUME INT NOT NULL,      -- 거래량
	TRADING_VALUE  INT NOT NULL,      -- 거래대금
	PRIMARY KEY (STCODE, DATE)
);

 

정상적으로 테이블이 생성되었는지 확인한다.

SELECT * FROM TBL_DAILY_STOCK;

 

정상적으로 생성된 것을 확인 하였다.

 

임의의 테스트 정보를 입력 후 재조회하여 동작을 확인한다.

INSERT INTO TBL_DAILY_STOCK (STCODE, DATE, PRICE_CUR, PRICE_START, PRICE_HIGH, PRICE_LOW, TRADING_VOLUME, TRADING_VALUE)
VALUES ('000001', '19700901', 1000, 800, 1100, 900, 123, 123000);

 

위의 SELECT 쿼리를 실행한 결과 정상적으로 데이터가 입력된 것을 확인할 수 있다.

 

2. Python 테스트 코드 작성

이제 Python 코드로 해당 테이블 조회하는 코드를 구현하기 위해 psycopg2 모듈을 stock_test 아나콘다 환경에 install 한다. 

자세한 연동 테스트 방법 2020/10/20 - [주식 공부] - [주식거래자동화] 04. Django 에 PostgreSQL 연동 참고 !!

 

import psycopg2

if __name__ == "__main__":
    conn_string = "host='localhost' dbname='postgres' user='postgres' password='패스워드'"
    conn = psycopg2.connect(conn_string)
    cur = conn.cursor()
    query = "SELECT * FROM TBL_DAILY_STOCK;"
    cur.execute(query)
    result = cur.fetchall()
    print(result)
    cur.close()
    conn.close()

 

위 코드 실행 결과

 

 

정상적으로 조회된 것을 확인할 수 있다.

 

3. Python 주식일봉차트조회요청 테스트 코드 작성

# main.py
import sys
from PyQt5.QtWidgets import *
from PyQt5.QAxContainer import *
from PyQt5.QtCore import *
import time
from DBDailyStock import *

TR_REQ_TIME_INTERVAL = 0.2

class Kiwoom(QAxWidget):
    def __init__(self):
        super().__init__()
        self._create_kiwoom_instance()
        self._set_signal_slots()
        self.total_count = 0
        self.stcode = 'NULL'
        self.DBDailyStock = DBDailyStock()

    def _create_kiwoom_instance(self):
        self.setControl("KHOPENAPI.KHOpenAPICtrl.1")

    def _set_signal_slots(self):
        self.OnEventConnect.connect(self._event_connect)
        self.OnReceiveTrData.connect(self._receive_tr_data)

    def comm_connect(self):
        self.dynamicCall("CommConnect()")
        self.login_event_loop = QEventLoop()
        self.login_event_loop.exec_()

    def _event_connect(self, err_code):
        if err_code == 0:
            print("connected")
        else:
            print("disconnected")

        self.login_event_loop.exit()

    def get_code_list_by_market(self, market):
        code_list = self.dynamicCall("GetCodeListByMarket(QString)", market)
        code_list = code_list.split(';')
        return code_list[:-1]

    def get_master_code_name(self, code):
        code_name = self.dynamicCall("GetMasterCodeName(QString)", code)
        return code_name

    def set_input_value(self, id, value):
        self.dynamicCall("SetInputValue(QString, QString)", id, value)

    def comm_rq_data(self, rqname, trcode, next, screen_no):
        self.dynamicCall("CommRqData(QString, QString, int, QString", rqname, trcode, next, screen_no)
        self.tr_event_loop = QEventLoop()
        self.tr_event_loop.exec_()

    def _comm_get_data(self, code, real_type, field_name, index, item_name):
        ret = self.dynamicCall("CommGetData(QString, QString, QString, int, QString", code,
                               real_type, field_name, index, item_name)
        return ret.strip()

    def _get_repeat_cnt(self, trcode, rqname):
        ret = self.dynamicCall("GetRepeatCnt(QString, QString)", trcode, rqname)
        return ret

    def _receive_tr_data(self, screen_no, rqname, trcode, record_name, next, unused1, unused2, unused3, unused4):
        if next == '2':
            self.remained_data = True
        else:
            self.remained_data = False

        if rqname == "opt10081_req":
            self._opt10081(rqname, trcode)

        try:
            self.tr_event_loop.exit()
        except AttributeError:
            pass

    def _opt10081(self, rqname, trcode):
        data_cnt = self._get_repeat_cnt(trcode, rqname)

        for i in range(data_cnt):
            date = self._comm_get_data(trcode, "", rqname, i, "일자")
            open = self._comm_get_data(trcode, "", rqname, i, "시가")
            high = self._comm_get_data(trcode, "", rqname, i, "고가")
            low = self._comm_get_data(trcode, "", rqname, i, "저가")
            close = self._comm_get_data(trcode, "", rqname, i, "현재가")
            volume = self._comm_get_data(trcode, "", rqname, i, "거래량")
            value = self._comm_get_data(trcode, "", rqname, i, "거래대금")
            self.total_count += 1
            print("%5s"%(self.total_count), date, open, high, low, close, volume, value)
            self.DBDailyStock.addDailyStock(self.stcode, date, close, open,high, low, volume, value)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    kiwoom = Kiwoom()
    kiwoom.comm_connect()

    # opt10081 TR 요청
    kiwoom.stcode = "005930"
    kiwoom.set_input_value("종목코드", "005930")
    kiwoom.set_input_value("수정주가구분", 1)
    kiwoom.comm_rq_data("opt10081_req", "opt10081", 0, "0101")

    while kiwoom.remained_data == True:
        time.sleep(TR_REQ_TIME_INTERVAL)
        kiwoom.set_input_value("종목코드", "005930")
        kiwoom.set_input_value("수정주가구분", 1)
        kiwoom.comm_rq_data("opt10081_req", "opt10081", 2, "0101")

 

위 코드로 테스트 조회 코드를 작성하였고 이는 아래 Wikidocs 페이지에 자세한 설명이 있다.

 

wikidocs.net/5756

 

위키독스

온라인 책을 제작 공유하는 플랫폼 서비스

wikidocs.net

 

4. 일봉데이터 PostgreSQL 저장

# DBDailyStock.py
import psycopg2

class DBDailyStock :
    def __init__(self) :
        conn_string = "host='localhost' dbname='postgres' user='postgres' password='패스워드'"
        self.conn = psycopg2.connect(conn_string)
        self.cur = self.conn.cursor()

    def __del__(self) :
        self.cur.close()
        self.conn.close()

    def getDailyStock(self) :
        query = "SELECT * FROM TBL_DAILY_STOCK;"
        self.cur.execute(query)
        result = self.cur.fetchall()
        print(result)

    def addDailyStock(self, STCODE ,DATE ,PRICE_CLOSE ,PRICE_OPEN ,PRICE_HIGH ,PRICE_LOW ,TRADING_VOLUME ,TRADING_VALUE) :
        query_format = "INSERT INTO TBL_DAILY_STOCK ( STCODE ,DATE ,PRICE_CLOSE ,PRICE_OPEN ,PRICE_HIGH ,PRICE_LOW ,TRADING_VOLUME ,TRADING_VALUE ) VALUES ('%s','%s',%s,%s,%s,%s,%s,%s);"
        query = query_format%(STCODE ,DATE ,PRICE_CLOSE ,PRICE_OPEN ,PRICE_HIGH ,PRICE_LOW ,TRADING_VOLUME ,TRADING_VALUE)
        self.cur.execute(query)
        self.conn.commit()

위 main.py 와 해당 DBDailyStock.py 파일을 같은 경로에 놓고 main.py 실행 시 TBL_DAILY_STOCK 에 데이터 저장을 수행한다.  수행 결과는 다음과 같다.

 

 

5. 저장된 데이터 확인

SELECT * FROM TBL_DAILY_STOCK LIMIT 10;

 

 

 

정상적으로 조회한 데이터가 입력된 것을 확인할 수 있다.

 

 

다음과 같은 방식으로 지정된 종목코드에 대해 DB에 저장하며 매번 키움증권API를 통해 조회하지 않고 과거 데이터를 DB에서 조회할 수 있다.

 

이후에는 저장된 데이터를 사용하여 일봉 그래프를 그리거나 투자가치가 있는 종목을 찾아보려 한다.

반응형