2023. 8. 9. 23:55ㆍTechnology[DevOps]
웹크롤링 데이터가 방대해 엘라스틱서치를 사용하지만 RDB만의 장점인 데이터의 무결성 및 일관성을 보장하고 정형화된 관계파악에 용이함을 가져가기 위해 웹크롤링 시 데이터는 RDB에 적재하고 Logstash로 RDB와 엘라스틱서치의 데이터를 동기화한 후 실제 웹서비스의 사용자가 검색 시 엘라스틱서치에서 검색결과를 반환하도록 해서 엘라스틱서치와 RDB의 장점을 모두 이용하려 합니다. 이를 위해 Logstash를 이용해 RDB와 엘라스틱서치의 데이터 동기화 관련해서 작성해보겠습니다.
1. Logstash config
input{
beats{port => 5044}
jdbc{
jdbc_validate_connection => true
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/usr/share/java/mysql-connector-java-5.1.12.jar"
jdbc_connection_string => "[ jdbc_connection_string_required ]"
jdbc_user => "[ jdbc_user_required ]"
jdbc_password => "[ jdbc_password_required ]"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT id,name,price,amount,discountamount,discountrate,deliveryfee,sellid,image,detail,modification_time,insertion_time,is_deleted, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM goods WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output{
elasticsearch {
hosts => ["[ elasticsearch_hosts_required ]"]
ssl => false
index => "goods"
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
}
stdout{}
}
위 설정파일에서
(1). beats{port => 5044} 로 5044번 포트에서 Logstash가 실행하도록 설정하고
(2). jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/usr/share/java/mysql-connector-java-5.1.12.jar" 로 mysql 드라이버를 사용하고 해당 드라이버 의 jar 파일 위치를 설정하고
(3). jdbc_connection_string => "[ jdbc_connection_string_required ]"
jdbc_user => "[ jdbc_user_required ]"
jdbc_password => "[ jdbc_password_required ]" 로 mysql 연결주소 및 연결정보를 입력하고
(4). tracking_column => "unix_ts_in_secs" 으로 설정해서 :sql_last_value 파라미터로 받는 컬럼을 설정하고
(5). schedule => "*/5 * * * * *" 으로 설정해서 매 5초마다 실행되도록 설정하고
(6). statement => "SELECT id,name,price,amount,discountamount,discountrate,deliveryfee,sellid,image,detail,modification_time,insertion_time,is_deleted, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM goods WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" 매번 이 쿼리가 실행될텐데 이 때 UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs 컬럼을 tracking_column으로 설정해서 다음에 실행될 때 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) 이 조건에 :sql_last_value를 이전에 실행되었을때의 UNIX_TIMESTAMP(modification_time) 데이터를 받아서 파라미터로 사용하므로 이전에 실행되었을때의 modification_time보다 큰 데이터 & NOW() 지금보다 작은 데이터만 SELECT 해서 데이터 동기화하므로 중복데이터 없이 동기화가 가능하도록 합니다.
(7). copy => { "id" => "[@metadata][_id]"} 로 id컬럼이 metadata의 _id와 매핑되고
(8). remove_field => ["id", "@version", "unix_ts_in_secs"] 로 "id", "@version", "unix_ts_in_secs" 이 컬럼들은 엘라스틱서치에 적재되지 않도록 하고
(9). hosts => ["[ elasticsearch_hosts_required ]"] 엘라스틱서치의 호스트 연결주소를 입력하고
(10). index => "goods" 엘라스틱서치의 인덱스 이름을 입력합니다.
이상으로 엘라스틱서치와 RDB의 데이터 동기화를 구현하고 있는 Logstash에 대해서 알아보았습니다.
지금까지 읽어주셔서 감사합니다.
'Technology[DevOps]' 카테고리의 다른 글
Jenkins 복호화 (1) | 2023.08.14 |
---|---|
git-secret 암호화 (1) | 2023.08.14 |
NaverCloud Micro서버를 활용해 DB서버 구축하기 (1) | 2022.05.16 |
NaverCloudServer를 활용하여 React, Spring 등 파일 서버에 올리기 (1) | 2022.03.18 |
Naver ObjectStorage를 이용하여 이미지 저장서버 활용하기(2) (1) | 2022.03.14 |