Pivotal Greenplum Stream Server
1. 개요
- GPSS (Greenplum Streaming Server)는 클라이언트와 Greenplum Database간의 통신 및 전송을 관리합니다.
데이이터 로딩전에 GPSS 인스턴스를 구성하고 시작하면 GPSS를 이용할 수 있습니다.
- GPSS (Greenplum Streaming Server)는 ETL(추출,변환,적재)하는 툴입니다. 하나 이상의 클라이언트 클라이언트(현재는 kafka)에서 스트림 데이터를 수집하며, 데이터 소스 및 데이터 형식은 클라이언트별로 다릅니다
- Greenplum 6.5 버전까지는 kafka 데이터 소스만 지원합니다.
- 관련자료 : https://gpdb.docs.pivotal.io/streaming-server/1-3-4/overview.html
2. 아키텍처
- Greenplum Streaming Server는 gRPC 서버입니다. GPSS gRPC 서비스 정의에는 Greenplum Database에 연결하고 Greenplum 메타 데이터를 검사하는 데 필요한 작업 및 메시지가 포함됩니다. 서비스 정의에는 클라이언트에서 Greenplum Database 테이블로 데이터를 쓰는 데 필요한 조작 및 메시지를 포함합니다
- gRPC에 대한 자세한 내용은 gRPC 설명서를 참조하십시오.(https://grpc.io/docs/)
- 아키텍처 그림
https://gpdb.docs.pivotal.io/streaming-server/1-3-4/gpssarch.png
3. Greenplum Streaming Server 설치 준비
1) GPSS 다운로드
- network.pivotal.io
- Pivotal Greenplum Streaming Server 에서 패키지 다운로드
2) GPSS 설치 파일
- gpss-gpdb6-1.3.4-rhel7-x86_64.gppkg : Greenplum 클러스터 모든 노드에 설치 하는 설치 파일
- gpss-gpdb6-1.3.4-rhel7-x86_64.tar.gz. : 단일 호스트의 Greenplum Database에 설치 파일
- gpss-gpdb6-1.3.4-rhel7-x86_64.rpm : ETL서버에서 설치시 사용되는 설치. 파일
==> gppkg와 tar.gz 패키지는 GPSS를 위하여 필요한 라이브러리 및 실행파일, 스크립트 파일
==> rpm 파일은 클라이언트 측 실행 파일, ETL 런타임 환경을 썰정하기 위한 용도
3) 설치 프로그램
- gpkafka : 단일 Greenplum-Kafka 커넥터를 이용하여 Kafka data를 Greenplum에 적재
- gpss : GPSS 인스턴스 구동
- gpsscli : GPSS 데이터 로드 잡을 관리 (submit, start, stop 등), 현재는 Kafka 데이터 소스만 지원
- kafkacat: kafka 테스트 및 디버그 유틸리티
4) 사전 준비
- GPSS 패키지 설치전에 GPSS 및 Greenplum-Kafka jobs 프로세스 중지
4. Greenplum Streaming Server 설치
1) Greenplum 서버에 설치
- Greenplum 프로세스가 떠 있는 상태에서 수행해야 함.
- 파일 복제
$ ssh gpadmin@gpmaster
$ . /usr/local/greenplum-db/greenplum_path.sh
$ gppkg -i gpss-gpdb6-1.3.4-rhel7-x86_64.gppkg
$ psql
gpadmin=# CREATE EXTENSION gpss;
CREATE EXTENSION
Time: 230.853 ms
gpadmin=#\q
$
2) ETL 서버에 설치
- ETL서버에서 root로 프로그램 설치
- yum 인스톨시 의존성 있는 패키지가 많음. (CentOS 최소 설치시:35개 rpm 추가 설치 됨.)
# ssh root@etl
# yum install gpss-gpdb6-1.3.4-rhel7-x86_64.rpm
# chown -R gpadmin:gpadmin /usr/local/gpss* ##<---- etl="" font="">---->
# yum install java -y
# java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
# su - gpadmin
$ vi ~/.bashrc
. /usr/local/gpss/gpss_path.sh. ## source 반영
$ . ~/.bashrc
5. Kafka 설치 (ETL 서버)
1) zookeeper 설치
# su - root
# cd /root
# wget https://downloads.apache.org/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
# cd /usr/local
# tar zxvf /root/apache-zookeeper-3.6.0-bin.tar.gz
# chown -R gpadmin:gpadmin apache-zookeeper-3.6.0-bin/
# ln -s apache-zookeeper-3.6.0-bin zookeeper
# mkdir /zdata
# echo 1 > /zdata/myid
# cd /usr/local/zookeeper/conf/
# cp zoo_sample.cfg zoo.cfg
# vi zoo.cfg
#dataDir=/tmp/zookeeper ###<< 아래 라인으로 수정
dataDir=/zdata
server.1=localhost:2888:3888 ###<< 라인 추가
# chown -R gpadmin:gpadmin /zdata
# chown -R gpadmin:gpadmin /usr/local/zookeeper*
2) zookeeper 기동
$ su - gpadmin
$ /usr/local/zookeeper/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
- 상태 확인
$ /usr/local/zookeeper/bin/zkServer.sh status
- 중지
$ /usr/local/zookeeper/bin/zkServer.sh stop
3) kafka 설치
# cd /root
# wgets http://apache.mirror.cdnetworks.com/kafka/2.4.1/kafka_2.13-2.4.1.tgz
# tar zxvf /root/kafka_2.13-2.4.1.tgz
# ln -s kafka_2.13-2.4.1 kafka
# mkdir /kdata1 /kdata2
# chown -R gpadmin:gpadmin /kdata*
# vi /usr/local/kafka/config/server.properties
#broker.id=0 ###<< 아래 라인으로 수정
broker.id=1
#log.dirs=/tmp/kafka-logs ###<< 아래 라인으로 수정
log.dirs=/kdata1,/kdata2
#zookeeper.connect=localhost:2181 ###<< 아래 라인으로 수정
zookeeper.connect=localhost:2181/greenplum-kafka
# chown -R gpadmin:gpadmin kafka*
4) kafka 기동
- 기동
$ su - gpadmin
$ /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 중지
$ /usr/local/kafka/bin/kafka-server-stop.sh
6. kafka / greenplum 연동 테스트
1) Greenplum 서버
- Test DB 생성
$ createdb testdb
$ psql testdb -c "CREATE TABLE data_from_kafka( customer_id int8, expenses decimal(9,2), tax_due decimal(7,2)) distributed by (customer_id)"
2) ETL 서버
- 방화벽 비활성화
# systemctl stop firewalld
- 테스트 데이터
$ vi /tmp/sample_data.csv
"1313131","12","1313.13"
"3535353","11","761.35"
"7979797","10","4489.00"
"7979797","11","18.72"
"3535353","10","6001.94"
"7979797","12","173.18"
"1313131","10","492.83"
"3535353","12","81.12"
"1313131","11","368.27"
3) kafka 토픽 생성 및 확인
- 토픽생성
$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181/greenplum-kafka --topic topic_for_gpkafka --partitions 1 --replication-factor 1 --create
Created topic topic_for_gpkafka.
- 확인
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
topic_for_gpkafka
- 참고, 토픽 삭제
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181/greenplum-kafka --topic topic_for_gpkafka --delete
- 토픽에 데이터 적재
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_for_gpkafka < /tmp/sample_data.csv
- 카프카 적재된 데이터 확인
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_for_gpkafka --from-beginning
4) gpkafka 구성 (Greenplum 서버)
- kafka load 구성파일 설정
$ vi /home/gpadmin/loadcfg.yaml
[gpadmin@etl ~]$ vi /home/gpadmin/loadcfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: topic_for_gpkafka
COLUMNS:
- NAME: cust_id
TYPE: int
- NAME: __IGNORED__
TYPE: int
- NAME: expenses
TYPE: decimal(9,2)
FORMAT: csv
ERROR_LIMIT: 125
OUTPUT:
TABLE: data_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: cust_id
- NAME: expenses
EXPRESSION: expenses
- NAME: tax_due
EXPRESSION: expenses * .0725
COMMIT:
MINIMAL_INTERVAL: 100
[gpadmin@etl ~]$
5) gpkafka로 데이터 로드 (Greenplum 서버)
- 1회 수행
$ gpkafka load --quit-at-eof ./loadcfg.yaml
[gpadmin@mdw ~]$ gpkafka load --quit-at-eof ./loadcfg.yaml
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-gpfdist listening on 0.0.0.0:8080
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-External table "public"."gpkafkaloadext_be1c95c0f2b8cbddcdc1ba51d401d4d4" already exist, reuse.
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-Start job f840edb3c9eeb3f3ffd569364e6bd5a7: input, output
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-gpkafka job has started
StartTime EndTime MsgNum MsgSize InsertedRecords RejectedRecords Speed
2020-04-01T09:05:11.923504Z 2020-04-01T09:05:12.163744Z 9 217 9 0 903B/sec
2020-04-01T09:05:12.209568Z 2020-04-01T09:05:12.225395Z 0 0 0 0 0B/sec
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Job finished: f840edb3c9eeb3f3ffd569364e6bd5a7
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Target table: "public"."data_from_kafka"
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Inserted 9 rows
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Rejected 0 rows
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Broker: etl:9092
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Topic: topic_for_gpkafka
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Partition 0 at offset 8
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Job f840edb3c9eeb3f3ffd569364e6bd5a7, status JOB_STOPPED, errmsg [], time 2020-04-01T09:05:12.247960173Z
[gpadmin@mdw ~]$
- 연속 대기
$ gpkafka load ./loadcfg.yaml ### 포그라운드 수행
$ gpkafka load ./loadcfg.yaml > /home/gpadmin/log/loadcfg.yaml.out 2>&1 & ### 백그라운드 수행
7. GPSS 보안 및 인증 설정
1) 필요시 설정 필요
8. GPSS Json 데이터 로딩
1) Kafka / ETL 서버
etl$ ssh etl
- 토픽 생성
etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic topic_json_gpkafka
- 토픽 확인
etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
topic_for_gpkafka
topic_json_gpkafka
- 샘플 Json 데이터 생성
etl$ vi /tmp/sample_data.json
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
{ "cust_id": 3535353, "month": 11, "expenses": 761.35 }
{ "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
{ "cust_id": 7979797, "month": 11, "expenses": 18.72 }
{ "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
{ "cust_id": 7979797, "month": 12, "expenses": 173.18 }
{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
{ "cust_id": 1313131, "month": 11, "expenses": 368.27 }
- 토픽에 Json 데이터 적재
etl$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_json_gpkafka < /tmp/sample_data.json
- Kafka의 데이터 확인
etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_json_gpkafka --from-beginning
2) Greenplum 서버
mdw$ ssh mdw
mdw$ . /usr/local/greenplum-db/greenplum_path.sh
mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
"ListenAddress": {
"Host": "",
"Port": 50007,
"SSL": false
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs
--GPSS Start 백그라운드
mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs &
- 테이블 생성
mdw$ psql -d testdb -c "CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) ) DISTRIBUTED BY (customer_id);"
- Yaml 생성
- BROKERS의 위치 확인 해야 함.
mdw$ vi ~/jsonload_cfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: topic_json_gpkafka
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: json_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: (jdata->>'cust_id')::int
- NAME: month
EXPRESSION: (jdata->>'month')::int
- NAME: amount_paid
EXPRESSION: (jdata->>'expenses')::decimal
COMMIT:
MINIMAL_INTERVAL: 2000
- Kafka data load job 등록
mdw$ gpsscli submit --name kafkajson2gp --gpss-port 50007 ~/jsonload_cfg.yaml
- GPSS Jobs 모든 리스트 확인
mdw$ gpsscli list --all --gpss-port 50007
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 testdb public json_from_kafka topic_json_gpkafka JOB_STOPPED
- GPSS kafkajson2gp job 스타트
mdw$ gpsscli start kafkajson2gp --gpss-port 50007
20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started
- Greenplum 데이터 적재 확인
mdw$ psql -d testdb -c "SELECT * FROM public.json_from_kafka;"
customer_id | month | amount_paid
-------------+-------+-------------
1313131 | 12 | 1313.13
1313131 | 10 | 492.83
1313131 | 11 | 368.27
3535353 | 11 | 761.35
3535353 | 10 | 6001.94
3535353 | 12 | 81.12
7979797 | 10 | 4489.00
7979797 | 11 | 18.72
7979797 | 12 | 173.18
(9 rows)
- GPSS Jobs 중지 및 제거
mdw$ gpsscli stop kafkajson2gp --gpss-port 50007
mdw$ gpsscli remove kafkajson2gp --gpss-port 50007
9. GPSS Json 데이터 Merge (Upsert)
1) Kafka / ETL 서버
etl$ ssh etl
- 토픽 생성
etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic customer_orders
- 토픽 확인
etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
customer_orders. <<<<<<<<< 추가
topic_for_gpkafka
topic_json_gpkafka
- 샘플 Json 데이터 생성
etl$ vi /tmp/sample_customer_data.csv
"1313131","1000.00"
"4444444","99.13"
"1515151","500.05"
"6666666","1.12"
"1717171","3000.03"
- 토픽에 Json 데이터 적재
etl$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic customer_orders < /tmp/sample_customer_data.csv
- Kafka의 데이터 확인
etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer_orders --from-beginning
2) Greenplum 서버
mdw$ ssh mdw
mdw$ . /usr/local/greenplum-db/greenplum_path.sh
--재사용하기 때문에, 위에서 한 경우 추가작업 없음.
mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
"ListenAddress": {
"Host": "",
"Port": 50007,
"SSL": false
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
--GPSS Start 백그라운드
mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs & #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
- 테이블 생성 및 데이터 적재
- 주의 : EOF 뒤에 스페이스가 있으면 에러 발생
mdw$ psql -d testdb <
DROP TABLE IF EXISTS customer_orders_tbl;
CREATE TABLE customer_orders_tbl( id int8, amount decimal(9,2) ) DISTRIBUTED BY (id);
INSERT INTO customer_orders_tbl VALUES (1717171, 17.17);
INSERT INTO customer_orders_tbl VALUES (1515151, 15.15);
EOF
- Yaml 생성
- BROKERS의 위치 확인 해야 함.
mdw$ vi ~/custorders_cfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: customer_orders
COLUMNS:
- NAME: id
TYPE: int
- NAME: order_amount
TYPE: decimal(9,2)
FORMAT: csv
ERROR_LIMIT: 25
OUTPUT:
TABLE: customer_orders_tbl
MODE: MERGE
MATCH_COLUMNS:
- id
UPDATE_COLUMNS:
- amount
MAPPING:
- NAME: id
EXPRESSION: id
- NAME: amount
EXPRESSION: order_amount
COMMIT:
MINIMAL_INTERVAL: 2000
- Kafka data load job 등록
mdw$ gpsscli submit --name orders1 --gpss-port 50007 ~/custorders_cfg.yaml
- GPSS Jobs 모든 리스트 확인
mdw$ gpsscli list --all --gpss-port 50007
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 testdb public json_from_kafka topic_json_gpkafka JOB_RUNNING
orders1 mdw 5432 testdb public customer_orders_tbl customer_orders JOB_STOPPED
- GPSS kafkajson2gp job 스타트
mdw$ gpsscli start orders1 --gpss-port 50007
20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started
- Greenplum 데이터 적재 확인
mdw$ psql -d testdb -c "SELECT * FROM customer_orders_tbl ORDER BY id;"
id | amount
---------+---------
1313131 | 1000.00
1515151 | 500.05 <<<<<<<< 15.15 에서 kafka데이터로 업데이트 됨.
1717171 | 3000.03 <<<<<<<< 17.17 에서 kafka데이터로 업데이트 됨.
4444444 | 99.13
6666666 | 1.12
(5 rows)
- GPSS Jobs 중지 및 제거
mdw$ gpsscli stop orders1 --gpss-port 50007
mdw$ gpsscli remove orders1 --gpss-port 50007
###################################################################################################################
10. GPSS Json 데이터 INSERT/UPDATE/DELETE
1) 데이터 셋 생성 (Greenplum)
mdw$ psql testdb << EOF
\timing off
\t
\a
\pset fieldsep ,
\o cdcdemo.csv
select i, 'v1', i, i, 'v1', i::char(4) from generate_series(0, 9999) as i;
select i, 'v1', i, i, 'n1', i::char(4) from generate_series(100, 400) as i;
select i, 'v2', i, i, 'v2', i::char(4) from generate_series(0, 9999) as i;
select i, 'v2', i, i, 'n2', i::char(4) from generate_series(100, 400) as i;
select i, 'v3', i, i, 'v3', i::char(4) from generate_series(0, 9999) as i;
select i, 'v3', i, i, 'n3', i::char(4) from generate_series(100, 400) as i;
select i, 'v4', i, i, 'v4', i::char(4) from generate_series(0, 9999) as i;
select i, 'v4', i, i, 'n4', i::char(4) from generate_series(100, 400) as i;
select i, 'v5', i, i, 'v5', i::char(4) from generate_series(0, 9999) as i;
select i, 'v5', i, i, 'n5', i::char(4) from generate_series(100, 400) as i;
select i, 'v3', NULL,NULL,NULL,1 from generate_series(600, 800) as i;
EOF
- etl 서버에 파일 전송
mdw$ scp cdcdemo.csv etl:/tmp
2) Kafka / ETL 서버
etl$ ssh etl
- 토픽 생성
etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic cdcdemo
- 토픽 확인
etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
cdcdemo <<<<<<<<< 추가
customer_orders
topic_for_gpkafka
topic_json_gpkafka
- 샘플 Json 데이터 생성 (github 에서 다운로드)
- 제일 마지막 컬럼이 delete f/g (1이면 delete)
etl$ head cdcdemo.csv
0,v1,0,0,v1,0,0
1,v1,1,1,v1,1,0
2,v1,2,2,v1,2,0
3,v1,3,3,v1,3,0
4,v1,4,4,v1,4,0
5,v1,5,5,v1,5,0
6,v1,6,6,v1,6,0
7,v1,7,7,v1,7,0
8,v1,8,8,v1,8,0
9,v1,9,9,v1,9,0
etl$ tail cdcdemo.csv
791,v3,,,,1
792,v3,,,,1
793,v3,,,,1
794,v3,,,,1
795,v3,,,,1
796,v3,,,,1
797,v3,,,,1
798,v3,,,,1
799,v3,,,,1
800,v3,,,,1
- 토픽에 Json 데이터 적재
etl$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cdcdemo < /tmp/cdcdemo/cdcdemo.csv
- Kafka의 데이터 확인
etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdcdemo --from-beginning
2) Greenplum 서버
mdw$ ssh mdw
mdw$ . /usr/local/greenplum-db/greenplum_path.sh
--재사용하기 때문에, 위에서 한 경우 추가작업 없음.
mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
"ListenAddress": {
"Host": "",
"Port": 50007,
"SSL": false
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
--GPSS Start 백그라운드
mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs & #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
- 테이블 생성 및 데이터 적재
- 주의 : EOF 뒤에 스페이스가 있으면 에러 발생
mdw$ psql -d testdb <
drop table if exists public.testcdc;
create table public.testcdc (k1 int, k2 char(5), v1 int, v2 int, v3 char(2), v4 char(4), mark_del char) DISTRIBUTED BY (k1, k2);
EOF
- Yaml 생성
- BROKERS의 위치 확인 해야 함.
mdw$ vi ~/cdcdemo.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: cdcdemo
COLUMNS:
- NAME: e_k1
TYPE: int
- NAME: e_k2
TYPE: char(5)
- NAME: e_v1
TYPE: int
- NAME: e_v2
TYPE: int
- NAME: e_v3
TYPE: char(2)
- NAME: e_v4
TYPE: char(5)
- NAME: e_v5
TYPE: char
FORMAT: csv
ERROR_LIMIT: 100
OUTPUT:
SCHEMA: public
TABLE: testcdc
MODE: merge
MATCH_COLUMNS:
- k1
- k2
UPDATE_COLUMNS:
- v1
- v2
- v3
- v4
- mark_del
MAPPING:
- NAME: k1
EXPRESSION: e_k1
- NAME: k2
EXPRESSION: e_k2
- NAME: v1
EXPRESSION: e_v1
- NAME: v2
EXPRESSION: e_v2
- NAME: v3
EXPRESSION: e_v3
- NAME: v4
EXPRESSION: e_v4
- NAME: mark_del
EXPRESSION: e_v5
COMMIT:
MINIMAL_INTERVAL: 2000
TASK:
POST_BATCH_SQL: delete from public.testcdc where mark_del = '1';
BATCH_INTERVAL: 1
- Kafka data load job 등록
mdw$ gpsscli submit --name cdcdemo --gpss-port 50007 ~/cdcdemo.yaml
- GPSS Jobs 모든 리스트 확인
mdw$ gpsscli list --all --gpss-port 50007
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 testdb public json_from_kafka topic_json_gpkafka JOB_RUNNING
orders1 mdw 5432 testdb public customer_orders_tbl customer_orders JOB_RUNNING
cdcdemo mdw 5432 testdb public testcdc cdcdemo JOB_STOPPED <<< 추가
- Greenplum 데이터 0건 확인
mdw$ psql -d testdb -c "SELECT * FROM public.testcdc ORDER BY k1,k2;"
- GPSS cdcdemo job 스타트
mdw$ gpsscli start cdcdemo --gpss-port 50007
20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started
- Greenplum 데이터 적재 확인
mdw$ psql -d testdb -c "SELECT * FROM public.testcdc ORDER BY k1,k2;"
mdw$ psql -d testdb -c "SELECT count(*) FROM public.testcdc ;"
- GPSS Jobs 중지 및 제거
mdw$ gpsscli stop cdcdemo --gpss-port 50007
mdw$ gpsscli remove cdcdemo --gpss-port 50007
11. 기타
1) gpss 데몬이 kill 될 경우 submit를 다시 해야 함.
2) 타겟 테이블에 truncate 할 경우 테이블 락 발생, Delete all은 상관 없음.
12. 에러 처리
1) 컬럼 개수가 맞지 않을 경우
에러 메시지 확인 - 컬럼
20200402:15:28:37 gpss:gpadmin:mdw:057928-[ERROR]:-Failed to execute batch: pq: segment reject limit reached, aborting operation (seg2 slice1 172.16.25.134:6000 pid=15976)
Use following query to access the detailed error:
SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09'
-- 에러 row 쿼리 조회
[gpadmin@mdw ~]$
testdb=# SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
testdb-# FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09';
cmdtime | errmsg | rawdata
-------------------------------+--------------------------------+---------------------------
2020-04-02 15:28:37.67138+09 | missing data for column "e_v5" | 1200,v1,1200,1200,v1,1200
2020-04-02 15:28:37.67138+09 | missing data for column "e_v5" | 1201,v1,1201,1201,v1,1201
2020-04-02 15:28:37.67138+09 | missing data for column "e_v5" | 1202,v1,1202,1202,v1,1202
-- 에러 row 정리
testdb=# select gp_truncate_error_log ('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"');
gp_truncate_error_log
-----------------------
t
(1 row)
Time: 26.875 ms
--에러 row 삭제 확인
testdb=# SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09';
cmdtime | errmsg | rawdata
---------+--------+---------
(0 rows)
Time: 4.795 ms
testdb=#
--토픽 리셋 정리
$ gpkafka load --force-reset-earliest ~/cdcdemo.yaml
$ gpkafka load --help
Usage:
gpkafka load [flags]
Flags:
--config string gpss json configuration file. it's preferred to be set.
--debug-port string enable pprof debug server at specified port
-f, --force force load config when submitting a job which is already running
--force-reset-earliest continue to load from earliest available message
--force-reset-latest continue to load from new messages
--force-reset-timestamp string continue to load from offset corresponding to the input timestamp, the timestamp should be millisecond format
--gpfdist-host string gpfdist host address. it will override the value of gpss config.
--gpfdist-port int32 gpfdist host port. it will override the value of gpss config.
-h, --help help for load
--name string job's name
--partition display progress info by partition
--quit-at-eof quit after reading kafka EOF
Global Flags:
-l, --log-dir string log directory, default is $HOME/gpAdminLogs
--verbose enable debug log
######### 참고
-- 토픽 메시지 삭제
$ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181/greenplum-kafka --topic cdcdemo
--토픽 메시지 삭제 확인
$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdcdemo --from-beginning
1. 개요
- GPSS (Greenplum Streaming Server)는 클라이언트와 Greenplum Database간의 통신 및 전송을 관리합니다.
데이이터 로딩전에 GPSS 인스턴스를 구성하고 시작하면 GPSS를 이용할 수 있습니다.
- GPSS (Greenplum Streaming Server)는 ETL(추출,변환,적재)하는 툴입니다. 하나 이상의 클라이언트 클라이언트(현재는 kafka)에서 스트림 데이터를 수집하며, 데이터 소스 및 데이터 형식은 클라이언트별로 다릅니다
- Greenplum 6.5 버전까지는 kafka 데이터 소스만 지원합니다.
- 관련자료 : https://gpdb.docs.pivotal.io/streaming-server/1-3-4/overview.html
2. 아키텍처
- Greenplum Streaming Server는 gRPC 서버입니다. GPSS gRPC 서비스 정의에는 Greenplum Database에 연결하고 Greenplum 메타 데이터를 검사하는 데 필요한 작업 및 메시지가 포함됩니다. 서비스 정의에는 클라이언트에서 Greenplum Database 테이블로 데이터를 쓰는 데 필요한 조작 및 메시지를 포함합니다
- gRPC에 대한 자세한 내용은 gRPC 설명서를 참조하십시오.(https://grpc.io/docs/)
- 아키텍처 그림
https://gpdb.docs.pivotal.io/streaming-server/1-3-4/gpssarch.png
3. Greenplum Streaming Server 설치 준비
1) GPSS 다운로드
- network.pivotal.io
- Pivotal Greenplum Streaming Server 에서 패키지 다운로드
2) GPSS 설치 파일
- gpss-gpdb6-1.3.4-rhel7-x86_64.gppkg : Greenplum 클러스터 모든 노드에 설치 하는 설치 파일
- gpss-gpdb6-1.3.4-rhel7-x86_64.tar.gz. : 단일 호스트의 Greenplum Database에 설치 파일
- gpss-gpdb6-1.3.4-rhel7-x86_64.rpm : ETL서버에서 설치시 사용되는 설치. 파일
==> gppkg와 tar.gz 패키지는 GPSS를 위하여 필요한 라이브러리 및 실행파일, 스크립트 파일
==> rpm 파일은 클라이언트 측 실행 파일, ETL 런타임 환경을 썰정하기 위한 용도
3) 설치 프로그램
- gpkafka : 단일 Greenplum-Kafka 커넥터를 이용하여 Kafka data를 Greenplum에 적재
- gpss : GPSS 인스턴스 구동
- gpsscli : GPSS 데이터 로드 잡을 관리 (submit, start, stop 등), 현재는 Kafka 데이터 소스만 지원
- kafkacat: kafka 테스트 및 디버그 유틸리티
4) 사전 준비
- GPSS 패키지 설치전에 GPSS 및 Greenplum-Kafka jobs 프로세스 중지
4. Greenplum Streaming Server 설치
1) Greenplum 서버에 설치
- Greenplum 프로세스가 떠 있는 상태에서 수행해야 함.
- 파일 복제
$ ssh gpadmin@gpmaster
$ . /usr/local/greenplum-db/greenplum_path.sh
$ gppkg -i gpss-gpdb6-1.3.4-rhel7-x86_64.gppkg
$ psql
gpadmin=# CREATE EXTENSION gpss;
CREATE EXTENSION
Time: 230.853 ms
gpadmin=#\q
$
2) ETL 서버에 설치
- ETL서버에서 root로 프로그램 설치
- yum 인스톨시 의존성 있는 패키지가 많음. (CentOS 최소 설치시:35개 rpm 추가 설치 됨.)
# ssh root@etl
# yum install gpss-gpdb6-1.3.4-rhel7-x86_64.rpm
# chown -R gpadmin:gpadmin /usr/local/gpss* ##<---- etl="" font="">---->
# yum install java -y
# java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
# su - gpadmin
$ vi ~/.bashrc
. /usr/local/gpss/gpss_path.sh. ## source 반영
$ . ~/.bashrc
5. Kafka 설치 (ETL 서버)
1) zookeeper 설치
# su - root
# cd /root
# wget https://downloads.apache.org/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
# cd /usr/local
# tar zxvf /root/apache-zookeeper-3.6.0-bin.tar.gz
# chown -R gpadmin:gpadmin apache-zookeeper-3.6.0-bin/
# ln -s apache-zookeeper-3.6.0-bin zookeeper
# mkdir /zdata
# echo 1 > /zdata/myid
# cd /usr/local/zookeeper/conf/
# cp zoo_sample.cfg zoo.cfg
# vi zoo.cfg
#dataDir=/tmp/zookeeper ###<< 아래 라인으로 수정
dataDir=/zdata
server.1=localhost:2888:3888 ###<< 라인 추가
# chown -R gpadmin:gpadmin /zdata
# chown -R gpadmin:gpadmin /usr/local/zookeeper*
2) zookeeper 기동
$ su - gpadmin
$ /usr/local/zookeeper/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
- 상태 확인
$ /usr/local/zookeeper/bin/zkServer.sh status
- 중지
$ /usr/local/zookeeper/bin/zkServer.sh stop
3) kafka 설치
# cd /root
# wgets http://apache.mirror.cdnetworks.com/kafka/2.4.1/kafka_2.13-2.4.1.tgz
# tar zxvf /root/kafka_2.13-2.4.1.tgz
# ln -s kafka_2.13-2.4.1 kafka
# mkdir /kdata1 /kdata2
# chown -R gpadmin:gpadmin /kdata*
# vi /usr/local/kafka/config/server.properties
#broker.id=0 ###<< 아래 라인으로 수정
broker.id=1
#log.dirs=/tmp/kafka-logs ###<< 아래 라인으로 수정
log.dirs=/kdata1,/kdata2
#zookeeper.connect=localhost:2181 ###<< 아래 라인으로 수정
zookeeper.connect=localhost:2181/greenplum-kafka
# chown -R gpadmin:gpadmin kafka*
4) kafka 기동
- 기동
$ su - gpadmin
$ /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 중지
$ /usr/local/kafka/bin/kafka-server-stop.sh
6. kafka / greenplum 연동 테스트
1) Greenplum 서버
- Test DB 생성
$ createdb testdb
$ psql testdb -c "CREATE TABLE data_from_kafka( customer_id int8, expenses decimal(9,2), tax_due decimal(7,2)) distributed by (customer_id)"
2) ETL 서버
- 방화벽 비활성화
# systemctl stop firewalld
- 테스트 데이터
$ vi /tmp/sample_data.csv
"1313131","12","1313.13"
"3535353","11","761.35"
"7979797","10","4489.00"
"7979797","11","18.72"
"3535353","10","6001.94"
"7979797","12","173.18"
"1313131","10","492.83"
"3535353","12","81.12"
"1313131","11","368.27"
3) kafka 토픽 생성 및 확인
- 토픽생성
$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181/greenplum-kafka --topic topic_for_gpkafka --partitions 1 --replication-factor 1 --create
Created topic topic_for_gpkafka.
- 확인
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
topic_for_gpkafka
- 참고, 토픽 삭제
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181/greenplum-kafka --topic topic_for_gpkafka --delete
- 토픽에 데이터 적재
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_for_gpkafka < /tmp/sample_data.csv
- 카프카 적재된 데이터 확인
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_for_gpkafka --from-beginning
4) gpkafka 구성 (Greenplum 서버)
- kafka load 구성파일 설정
$ vi /home/gpadmin/loadcfg.yaml
[gpadmin@etl ~]$ vi /home/gpadmin/loadcfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: topic_for_gpkafka
COLUMNS:
- NAME: cust_id
TYPE: int
- NAME: __IGNORED__
TYPE: int
- NAME: expenses
TYPE: decimal(9,2)
FORMAT: csv
ERROR_LIMIT: 125
OUTPUT:
TABLE: data_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: cust_id
- NAME: expenses
EXPRESSION: expenses
- NAME: tax_due
EXPRESSION: expenses * .0725
COMMIT:
MINIMAL_INTERVAL: 100
[gpadmin@etl ~]$
5) gpkafka로 데이터 로드 (Greenplum 서버)
- 1회 수행
$ gpkafka load --quit-at-eof ./loadcfg.yaml
[gpadmin@mdw ~]$ gpkafka load --quit-at-eof ./loadcfg.yaml
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-gpfdist listening on 0.0.0.0:8080
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-External table "public"."gpkafkaloadext_be1c95c0f2b8cbddcdc1ba51d401d4d4" already exist, reuse.
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-Start job f840edb3c9eeb3f3ffd569364e6bd5a7: input
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-gpkafka job has started
StartTime EndTime MsgNum MsgSize InsertedRecords RejectedRecords Speed
2020-04-01T09:05:11.923504Z 2020-04-01T09:05:12.163744Z 9 217 9 0 903B/sec
2020-04-01T09:05:12.209568Z 2020-04-01T09:05:12.225395Z 0 0 0 0 0B/sec
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Job finished: f840edb3c9eeb3f3ffd569364e6bd5a7
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Target table: "public"."data_from_kafka"
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Inserted 9 rows
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Rejected 0 rows
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Broker: etl:9092
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Topic: topic_for_gpkafka
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Partition 0 at offset 8
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Job f840edb3c9eeb3f3ffd569364e6bd5a7, status JOB_STOPPED, errmsg [], time 2020-04-01T09:05:12.247960173Z
[gpadmin@mdw ~]$
- 연속 대기
$ gpkafka load ./loadcfg.yaml ### 포그라운드 수행
$ gpkafka load ./loadcfg.yaml > /home/gpadmin/log/loadcfg.yaml.out 2>&1 & ### 백그라운드 수행
7. GPSS 보안 및 인증 설정
1) 필요시 설정 필요
8. GPSS Json 데이터 로딩
1) Kafka / ETL 서버
etl$ ssh etl
- 토픽 생성
etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic topic_json_gpkafka
- 토픽 확인
etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
topic_for_gpkafka
topic_json_gpkafka
- 샘플 Json 데이터 생성
etl$ vi /tmp/sample_data.json
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
{ "cust_id": 3535353, "month": 11, "expenses": 761.35 }
{ "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
{ "cust_id": 7979797, "month": 11, "expenses": 18.72 }
{ "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
{ "cust_id": 7979797, "month": 12, "expenses": 173.18 }
{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
{ "cust_id": 1313131, "month": 11, "expenses": 368.27 }
- 토픽에 Json 데이터 적재
etl$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_json_gpkafka < /tmp/sample_data.json
- Kafka의 데이터 확인
etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_json_gpkafka --from-beginning
2) Greenplum 서버
mdw$ ssh mdw
mdw$ . /usr/local/greenplum-db/greenplum_path.sh
mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
"ListenAddress": {
"Host": "",
"Port": 50007,
"SSL": false
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs
--GPSS Start 백그라운드
mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs &
- 테이블 생성
mdw$ psql -d testdb -c "CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) ) DISTRIBUTED BY (customer_id);"
- Yaml 생성
- BROKERS의 위치 확인 해야 함.
mdw$ vi ~/jsonload_cfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: topic_json_gpkafka
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: json_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: (jdata->>'cust_id')::int
- NAME: month
EXPRESSION: (jdata->>'month')::int
- NAME: amount_paid
EXPRESSION: (jdata->>'expenses')::decimal
COMMIT:
MINIMAL_INTERVAL: 2000
- Kafka data load job 등록
mdw$ gpsscli submit --name kafkajson2gp --gpss-port 50007 ~/jsonload_cfg.yaml
- GPSS Jobs 모든 리스트 확인
mdw$ gpsscli list --all --gpss-port 50007
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 testdb public json_from_kafka topic_json_gpkafka JOB_STOPPED
- GPSS kafkajson2gp job 스타트
mdw$ gpsscli start kafkajson2gp --gpss-port 50007
20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started
- Greenplum 데이터 적재 확인
mdw$ psql -d testdb -c "SELECT * FROM public.json_from_kafka;"
customer_id | month | amount_paid
-------------+-------+-------------
1313131 | 12 | 1313.13
1313131 | 10 | 492.83
1313131 | 11 | 368.27
3535353 | 11 | 761.35
3535353 | 10 | 6001.94
3535353 | 12 | 81.12
7979797 | 10 | 4489.00
7979797 | 11 | 18.72
7979797 | 12 | 173.18
(9 rows)
- GPSS Jobs 중지 및 제거
mdw$ gpsscli stop kafkajson2gp --gpss-port 50007
mdw$ gpsscli remove kafkajson2gp --gpss-port 50007
9. GPSS Json 데이터 Merge (Upsert)
1) Kafka / ETL 서버
etl$ ssh etl
- 토픽 생성
etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic customer_orders
- 토픽 확인
etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
customer_orders. <<<<<<<<< 추가
topic_for_gpkafka
topic_json_gpkafka
- 샘플 Json 데이터 생성
etl$ vi /tmp/sample_customer_data.csv
"1313131","1000.00"
"4444444","99.13"
"1515151","500.05"
"6666666","1.12"
"1717171","3000.03"
- 토픽에 Json 데이터 적재
etl$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic customer_orders < /tmp/sample_customer_data.csv
- Kafka의 데이터 확인
etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer_orders --from-beginning
2) Greenplum 서버
mdw$ ssh mdw
mdw$ . /usr/local/greenplum-db/greenplum_path.sh
--재사용하기 때문에, 위에서 한 경우 추가작업 없음.
mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
"ListenAddress": {
"Host": "",
"Port": 50007,
"SSL": false
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
--GPSS Start 백그라운드
mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs & #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
- 테이블 생성 및 데이터 적재
- 주의 : EOF 뒤에 스페이스가 있으면 에러 발생
mdw$ psql -d testdb <
DROP TABLE IF EXISTS customer_orders_tbl;
CREATE TABLE customer_orders_tbl( id int8, amount decimal(9,2) ) DISTRIBUTED BY (id);
INSERT INTO customer_orders_tbl VALUES (1717171, 17.17);
INSERT INTO customer_orders_tbl VALUES (1515151, 15.15);
EOF
- Yaml 생성
- BROKERS의 위치 확인 해야 함.
mdw$ vi ~/custorders_cfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: customer_orders
COLUMNS:
- NAME: id
TYPE: int
- NAME: order_amount
TYPE: decimal(9,2)
FORMAT: csv
ERROR_LIMIT: 25
OUTPUT:
TABLE: customer_orders_tbl
MODE: MERGE
MATCH_COLUMNS:
- id
UPDATE_COLUMNS:
- amount
MAPPING:
- NAME: id
EXPRESSION: id
- NAME: amount
EXPRESSION: order_amount
COMMIT:
MINIMAL_INTERVAL: 2000
- Kafka data load job 등록
mdw$ gpsscli submit --name orders1 --gpss-port 50007 ~/custorders_cfg.yaml
- GPSS Jobs 모든 리스트 확인
mdw$ gpsscli list --all --gpss-port 50007
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 testdb public json_from_kafka topic_json_gpkafka JOB_RUNNING
orders1 mdw 5432 testdb public customer_orders_tbl customer_orders JOB_STOPPED
- GPSS kafkajson2gp job 스타트
mdw$ gpsscli start orders1 --gpss-port 50007
20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started
- Greenplum 데이터 적재 확인
mdw$ psql -d testdb -c "SELECT * FROM customer_orders_tbl ORDER BY id;"
id | amount
---------+---------
1313131 | 1000.00
1515151 | 500.05 <<<<<<<< 15.15 에서 kafka데이터로 업데이트 됨.
1717171 | 3000.03 <<<<<<<< 17.17 에서 kafka데이터로 업데이트 됨.
4444444 | 99.13
6666666 | 1.12
(5 rows)
- GPSS Jobs 중지 및 제거
mdw$ gpsscli stop orders1 --gpss-port 50007
mdw$ gpsscli remove orders1 --gpss-port 50007
###################################################################################################################
10. GPSS Json 데이터 INSERT/UPDATE/DELETE
1) 데이터 셋 생성 (Greenplum)
mdw$ psql testdb << EOF
\timing off
\t
\a
\pset fieldsep ,
\o cdcdemo.csv
select i, 'v1', i, i, 'v1', i::char(4) from generate_series(0, 9999) as i;
select i, 'v1', i, i, 'n1', i::char(4) from generate_series(100, 400) as i;
select i, 'v2', i, i, 'v2', i::char(4) from generate_series(0, 9999) as i;
select i, 'v2', i, i, 'n2', i::char(4) from generate_series(100, 400) as i;
select i, 'v3', i, i, 'v3', i::char(4) from generate_series(0, 9999) as i;
select i, 'v3', i, i, 'n3', i::char(4) from generate_series(100, 400) as i;
select i, 'v4', i, i, 'v4', i::char(4) from generate_series(0, 9999) as i;
select i, 'v4', i, i, 'n4', i::char(4) from generate_series(100, 400) as i;
select i, 'v5', i, i, 'v5', i::char(4) from generate_series(0, 9999) as i;
select i, 'v5', i, i, 'n5', i::char(4) from generate_series(100, 400) as i;
select i, 'v3', NULL,NULL,NULL,1 from generate_series(600, 800) as i;
EOF
- etl 서버에 파일 전송
mdw$ scp cdcdemo.csv etl:/tmp
2) Kafka / ETL 서버
etl$ ssh etl
- 토픽 생성
etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic cdcdemo
- 토픽 확인
etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
cdcdemo <<<<<<<<< 추가
customer_orders
topic_for_gpkafka
topic_json_gpkafka
- 샘플 Json 데이터 생성 (github 에서 다운로드)
- 제일 마지막 컬럼이 delete f/g (1이면 delete)
etl$ head cdcdemo.csv
0,v1,0,0,v1,0,0
1,v1,1,1,v1,1,0
2,v1,2,2,v1,2,0
3,v1,3,3,v1,3,0
4,v1,4,4,v1,4,0
5,v1,5,5,v1,5,0
6,v1,6,6,v1,6,0
7,v1,7,7,v1,7,0
8,v1,8,8,v1,8,0
9,v1,9,9,v1,9,0
etl$ tail cdcdemo.csv
791,v3,,,,1
792,v3,,,,1
793,v3,,,,1
794,v3,,,,1
795,v3,,,,1
796,v3,,,,1
797,v3,,,,1
798,v3,,,,1
799,v3,,,,1
800,v3,,,,1
- 토픽에 Json 데이터 적재
etl$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cdcdemo < /tmp/cdcdemo/cdcdemo.csv
- Kafka의 데이터 확인
etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdcdemo --from-beginning
2) Greenplum 서버
mdw$ ssh mdw
mdw$ . /usr/local/greenplum-db/greenplum_path.sh
--재사용하기 때문에, 위에서 한 경우 추가작업 없음.
mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
"ListenAddress": {
"Host": "",
"Port": 50007,
"SSL": false
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
--GPSS Start 백그라운드
mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs & #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
- 테이블 생성 및 데이터 적재
- 주의 : EOF 뒤에 스페이스가 있으면 에러 발생
mdw$ psql -d testdb <
drop table if exists public.testcdc;
create table public.testcdc (k1 int, k2 char(5), v1 int, v2 int, v3 char(2), v4 char(4), mark_del char) DISTRIBUTED BY (k1, k2);
EOF
- Yaml 생성
- BROKERS의 위치 확인 해야 함.
mdw$ vi ~/cdcdemo.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: etl:9092
TOPIC: cdcdemo
COLUMNS:
- NAME: e_k1
TYPE: int
- NAME: e_k2
TYPE: char(5)
- NAME: e_v1
TYPE: int
- NAME: e_v2
TYPE: int
- NAME: e_v3
TYPE: char(2)
- NAME: e_v4
TYPE: char(5)
- NAME: e_v5
TYPE: char
FORMAT: csv
ERROR_LIMIT: 100
OUTPUT:
SCHEMA: public
TABLE: testcdc
MODE: merge
MATCH_COLUMNS:
- k1
- k2
UPDATE_COLUMNS:
- v1
- v2
- v3
- v4
- mark_del
MAPPING:
- NAME: k1
EXPRESSION: e_k1
- NAME: k2
EXPRESSION: e_k2
- NAME: v1
EXPRESSION: e_v1
- NAME: v2
EXPRESSION: e_v2
- NAME: v3
EXPRESSION: e_v3
- NAME: v4
EXPRESSION: e_v4
- NAME: mark_del
EXPRESSION: e_v5
COMMIT:
MINIMAL_INTERVAL: 2000
TASK:
POST_BATCH_SQL: delete from public.testcdc where mark_del = '1';
BATCH_INTERVAL: 1
- Kafka data load job 등록
mdw$ gpsscli submit --name cdcdemo --gpss-port 50007 ~/cdcdemo.yaml
- GPSS Jobs 모든 리스트 확인
mdw$ gpsscli list --all --gpss-port 50007
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 testdb public json_from_kafka topic_json_gpkafka JOB_RUNNING
orders1 mdw 5432 testdb public customer_orders_tbl customer_orders JOB_RUNNING
cdcdemo mdw 5432 testdb public testcdc cdcdemo JOB_STOPPED <<< 추가
- Greenplum 데이터 0건 확인
mdw$ psql -d testdb -c "SELECT * FROM public.testcdc ORDER BY k1,k2;"
- GPSS cdcdemo job 스타트
mdw$ gpsscli start cdcdemo --gpss-port 50007
20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started
- Greenplum 데이터 적재 확인
mdw$ psql -d testdb -c "SELECT * FROM public.testcdc ORDER BY k1,k2;"
mdw$ psql -d testdb -c "SELECT count(*) FROM public.testcdc ;"
- GPSS Jobs 중지 및 제거
mdw$ gpsscli stop cdcdemo --gpss-port 50007
mdw$ gpsscli remove cdcdemo --gpss-port 50007
11. 기타
1) gpss 데몬이 kill 될 경우 submit를 다시 해야 함.
2) 타겟 테이블에 truncate 할 경우 테이블 락 발생, Delete all은 상관 없음.
12. 에러 처리
1) 컬럼 개수가 맞지 않을 경우
에러 메시지 확인 - 컬럼
20200402:15:28:37 gpss:gpadmin:mdw:057928-[ERROR]:-Failed to execute batch: pq: segment reject limit reached, aborting operation (seg2 slice1 172.16.25.134:6000 pid=15976)
Use following query to access the detailed error:
SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09'
-- 에러 row 쿼리 조회
[gpadmin@mdw ~]$
testdb=# SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
testdb-# FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09';
cmdtime | errmsg | rawdata
-------------------------------+--------------------------------+---------------------------
2020-04-02 15:28:37.67138+09 | missing data for column "e_v5" | 1200,v1,1200,1200,v1,1200
2020-04-02 15:28:37.67138+09 | missing data for column "e_v5" | 1201,v1,1201,1201,v1,1201
2020-04-02 15:28:37.67138+09 | missing data for column "e_v5" | 1202,v1,1202,1202,v1,1202
-- 에러 row 정리
testdb=# select gp_truncate_error_log ('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"');
gp_truncate_error_log
-----------------------
t
(1 row)
Time: 26.875 ms
--에러 row 삭제 확인
testdb=# SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09';
cmdtime | errmsg | rawdata
---------+--------+---------
(0 rows)
Time: 4.795 ms
testdb=#
--토픽 리셋 정리
$ gpkafka load --force-reset-earliest ~/cdcdemo.yaml
$ gpkafka load --help
Usage:
gpkafka load
Flags:
--config string gpss json configuration file. it's preferred to be set.
--debug-port string enable pprof debug server at specified port
-f, --force force load config when submitting a job which is already running
--force-reset-earliest continue to load from earliest available message
--force-reset-latest continue to load from new messages
--force-reset-timestamp string continue to load from offset corresponding to the input timestamp, the timestamp should be millisecond format
--gpfdist-host string gpfdist host address. it will override the value of gpss config.
--gpfdist-port int32 gpfdist host port. it will override the value of gpss config.
-h, --help help for load
--name string job's name
--partition display progress info by partition
--quit-at-eof quit after reading kafka EOF
Global Flags:
-l, --log-dir string log directory, default is $HOME/gpAdminLogs
--verbose enable debug log
######### 참고
-- 토픽 메시지 삭제
$ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181/greenplum-kafka --topic cdcdemo
--토픽 메시지 삭제 확인
$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdcdemo --from-beginning
댓글 없음:
댓글 쓰기