2020년 4월 2일 목요일

Greenplum Streaming Server

Pivotal Greenplum Stream Server

1. 개요
  - GPSS (Greenplum Streaming Server)는 클라이언트와 Greenplum Database간의 통신 및 전송을 관리합니다.
    데이이터 로딩전에 GPSS 인스턴스를 구성하고 시작하면 GPSS를 이용할 수 있습니다.

  - GPSS (Greenplum Streaming Server)는 ETL(추출,변환,적재)하는 툴입니다. 하나 이상의 클라이언트 클라이언트(현재는 kafka)에서 스트림 데이터를 수집하며, 데이터 소스 및 데이터 형식은 클라이언트별로 다릅니다
  
  - Greenplum 6.5 버전까지는 kafka 데이터 소스만 지원합니다.
  
  - 관련자료 : https://gpdb.docs.pivotal.io/streaming-server/1-3-4/overview.html

2. 아키텍처
  - Greenplum Streaming Server는 gRPC 서버입니다. GPSS gRPC 서비스 정의에는 Greenplum Database에 연결하고 Greenplum 메타 데이터를 검사하는 데 필요한 작업 및 메시지가 포함됩니다. 서비스 정의에는 클라이언트에서 Greenplum Database 테이블로 데이터를 쓰는 데 필요한 조작 및 메시지를 포함합니다 
  - gRPC에 대한 자세한 내용은 gRPC 설명서를 참조하십시오.(https://grpc.io/docs/)

  - 아키텍처 그림
     https://gpdb.docs.pivotal.io/streaming-server/1-3-4/gpssarch.png


3. Greenplum Streaming Server 설치 준비
  1) GPSS 다운로드
     - network.pivotal.io 
     - Pivotal Greenplum Streaming Server 에서 패키지 다운로드

  2) GPSS 설치 파일
     - gpss-gpdb6-1.3.4-rhel7-x86_64.gppkg    : Greenplum 클러스터 모든 노드에 설치 하는 설치 파일
     - gpss-gpdb6-1.3.4-rhel7-x86_64.tar.gz.  : 단일 호스트의 Greenplum Database에 설치 파일
     - gpss-gpdb6-1.3.4-rhel7-x86_64.rpm      : ETL서버에서 설치시 사용되는 설치. 파일
      ==> gppkg와 tar.gz 패키지는 GPSS를 위하여 필요한 라이브러리 및 실행파일, 스크립트 파일 
      ==> rpm 파일은 클라이언트 측 실행 파일, ETL 런타임 환경을 썰정하기 위한 용도

  3) 설치 프로그램
     - gpkafka :  단일 Greenplum-Kafka 커넥터를 이용하여 Kafka data를 Greenplum에 적재
     - gpss    :  GPSS 인스턴스 구동
     - gpsscli :  GPSS 데이터 로드 잡을 관리 (submit, start, stop 등), 현재는 Kafka 데이터 소스만 지원
     - kafkacat:  kafka 테스트 및 디버그 유틸리티

  4) 사전 준비  
     - GPSS 패키지 설치전에 GPSS 및 Greenplum-Kafka jobs 프로세스 중지


4. Greenplum Streaming Server 설치 
   1) Greenplum 서버에 설치
     - Greenplum 프로세스가 떠 있는 상태에서 수행해야 함.
     - 파일 복제
     $ ssh gpadmin@gpmaster
     $ . /usr/local/greenplum-db/greenplum_path.sh
     $ gppkg -i gpss-gpdb6-1.3.4-rhel7-x86_64.gppkg
     $ psql
       gpadmin=# CREATE EXTENSION gpss;
       CREATE EXTENSION
       Time: 230.853 ms
       gpadmin=#\q
     $
   2) ETL 서버에 설치
     -  ETL서버에서 root로 프로그램 설치
     - yum 인스톨시 의존성 있는 패키지가 많음. (CentOS 최소 설치시:35개 rpm 추가 설치 됨.)
     # ssh root@etl
     # yum install gpss-gpdb6-1.3.4-rhel7-x86_64.rpm
     # chown -R gpadmin:gpadmin /usr/local/gpss*         ##<---- etl="" font="">
     # yum install java  -y
     # java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)

     # su - gpadmin
     $ vi ~/.bashrc
       . /usr/local/gpss/gpss_path.sh.                   ## source 반영
     $ . ~/.bashrc 


5. Kafka 설치 (ETL 서버)
   1) zookeeper 설치 
      # su - root
      # cd /root
      # wget https://downloads.apache.org/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
      # cd /usr/local
      # tar zxvf /root/apache-zookeeper-3.6.0-bin.tar.gz
      # chown -R gpadmin:gpadmin apache-zookeeper-3.6.0-bin/
      # ln -s apache-zookeeper-3.6.0-bin zookeeper      
      # mkdir /zdata
      # echo 1 > /zdata/myid
      # cd /usr/local/zookeeper/conf/
      # cp zoo_sample.cfg zoo.cfg
      # vi zoo.cfg
        #dataDir=/tmp/zookeeper             ###<< 아래 라인으로 수정
        dataDir=/zdata
        server.1=localhost:2888:3888        ###<< 라인 추가

      # chown -R gpadmin:gpadmin /zdata
      # chown -R gpadmin:gpadmin /usr/local/zookeeper*        

   2) zookeeper 기동
      $ su - gpadmin
      $ /usr/local/zookeeper/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

      - 상태 확인
      $ /usr/local/zookeeper/bin/zkServer.sh status
      - 중지
      $ /usr/local/zookeeper/bin/zkServer.sh stop


   3) kafka 설치
      # cd /root
      # wgets http://apache.mirror.cdnetworks.com/kafka/2.4.1/kafka_2.13-2.4.1.tgz
      # tar zxvf /root/kafka_2.13-2.4.1.tgz
      # ln -s kafka_2.13-2.4.1 kafka
      # mkdir /kdata1 /kdata2
      # chown -R gpadmin:gpadmin /kdata*
      # vi /usr/local/kafka/config/server.properties
        #broker.id=0                       ###<< 아래 라인으로 수정
        broker.id=1

        #log.dirs=/tmp/kafka-logs          ###<< 아래 라인으로 수정
        log.dirs=/kdata1,/kdata2

        #zookeeper.connect=localhost:2181                ###<< 아래 라인으로 수정
        zookeeper.connect=localhost:2181/greenplum-kafka

      # chown -R gpadmin:gpadmin kafka*

   4) kafka 기동
      - 기동
      $ su - gpadmin
      $ /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
      - 중지
      $ /usr/local/kafka/bin/kafka-server-stop.sh


6. kafka / greenplum 연동 테스트
  1) Greenplum 서버
     - Test DB 생성
     $ createdb testdb
     $ psql testdb -c "CREATE TABLE data_from_kafka( customer_id int8, expenses decimal(9,2), tax_due decimal(7,2)) distributed by (customer_id)"
  
  2) ETL 서버 
    - 방화벽 비활성화 
    # systemctl stop firewalld

    - 테스트 데이터 
    $ vi /tmp/sample_data.csv
"1313131","12","1313.13"
"3535353","11","761.35"
"7979797","10","4489.00"
"7979797","11","18.72"
"3535353","10","6001.94"
"7979797","12","173.18"
"1313131","10","492.83"
"3535353","12","81.12"
"1313131","11","368.27"


  3) kafka 토픽 생성 및 확인
    - 토픽생성
          $ /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181/greenplum-kafka --topic topic_for_gpkafka --partitions 1 --replication-factor 1 --create
          
          Created topic topic_for_gpkafka.
        - 확인
          $ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
          
          topic_for_gpkafka
        - 참고, 토픽 삭제
          /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181/greenplum-kafka --topic topic_for_gpkafka --delete

        - 토픽에 데이터 적재
          /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_for_gpkafka < /tmp/sample_data.csv
        
        - 카프카 적재된 데이터 확인
          /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic  topic_for_gpkafka --from-beginning


  4) gpkafka 구성 (Greenplum 서버)
     - kafka load 구성파일 설정
     $ vi /home/gpadmin/loadcfg.yaml
[gpadmin@etl ~]$ vi /home/gpadmin/loadcfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
   INPUT:
     SOURCE:
        BROKERS: etl:9092
        TOPIC: topic_for_gpkafka
     COLUMNS:
        - NAME: cust_id
          TYPE: int
        - NAME: __IGNORED__
          TYPE: int
        - NAME: expenses
          TYPE: decimal(9,2)
     FORMAT: csv
     ERROR_LIMIT: 125
   OUTPUT:
     TABLE: data_from_kafka
     MAPPING:
        - NAME: customer_id
          EXPRESSION: cust_id
        - NAME: expenses
          EXPRESSION: expenses
        - NAME: tax_due
          EXPRESSION: expenses * .0725
   COMMIT:
     MINIMAL_INTERVAL: 100
[gpadmin@etl ~]$ 

  5) gpkafka로 데이터 로드 (Greenplum 서버)
       - 1회 수행
       $ gpkafka load --quit-at-eof ./loadcfg.yaml  

[gpadmin@mdw ~]$ gpkafka load --quit-at-eof ./loadcfg.yaml
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-gpfdist listening on 0.0.0.0:8080
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-External table "public"."gpkafkaloadext_be1c95c0f2b8cbddcdc1ba51d401d4d4" already exist, reuse.
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-Start job f840edb3c9eeb3f3ffd569364e6bd5a7: input, output
20200401:18:05:11 gpkafka:gpadmin:mdw:103524-[INFO]:-gpkafka job has started
StartTime     EndTime       MsgNum    MsgSize   InsertedRecords RejectedRecords Speed
2020-04-01T09:05:11.923504Z 2020-04-01T09:05:12.163744Z 9           217             9           0           903B/sec
2020-04-01T09:05:12.209568Z 2020-04-01T09:05:12.225395Z 0           0               0           0           0B/sec
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Job finished: f840edb3c9eeb3f3ffd569364e6bd5a7
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Target table: "public"."data_from_kafka"
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Inserted 9 rows
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Rejected 0 rows
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Broker: etl:9092
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Topic: topic_for_gpkafka
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Partition 0 at offset 8
20200401:18:05:12 gpkafka:gpadmin:mdw:103524-[INFO]:-Job f840edb3c9eeb3f3ffd569364e6bd5a7, status JOB_STOPPED, errmsg [], time 2020-04-01T09:05:12.247960173Z
[gpadmin@mdw ~]$

       - 연속 대기
       $ gpkafka load ./loadcfg.yaml      ### 포그라운드 수행
       $ gpkafka load ./loadcfg.yaml > /home/gpadmin/log/loadcfg.yaml.out 2>&1 &  ### 백그라운드 수행




7. GPSS 보안 및 인증 설정
   1) 필요시 설정 필요


8. GPSS Json 데이터 로딩  
  1) Kafka / ETL 서버
    etl$ ssh etl
    
    - 토픽 생성
    etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic topic_json_gpkafka

    - 토픽 확인
    etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
    topic_for_gpkafka
    topic_json_gpkafka
    
    - 샘플 Json 데이터 생성
    etl$ vi /tmp/sample_data.json
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
{ "cust_id": 3535353, "month": 11, "expenses": 761.35 }
{ "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
{ "cust_id": 7979797, "month": 11, "expenses": 18.72 }
{ "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
{ "cust_id": 7979797, "month": 12, "expenses": 173.18 }
{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
{ "cust_id": 1313131, "month": 11, "expenses": 368.27 }

   - 토픽에 Json 데이터 적재   
   etl$  /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_json_gpkafka < /tmp/sample_data.json

   - Kafka의 데이터 확인
   etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_json_gpkafka --from-beginning

  2) Greenplum 서버
   mdw$ ssh mdw
   mdw$ . /usr/local/greenplum-db/greenplum_path.sh
   mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
    "ListenAddress": {
        "Host": "",
        "Port": 50007,
        "SSL": false
    },
    "Gpfdist": {
        "Host": "",
        "Port": 8319
    }
}
   mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs
   
   --GPSS Start 백그라운드
   mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs &

   - 테이블 생성
   mdw$ psql -d testdb -c "CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) ) DISTRIBUTED BY (customer_id);"

   - Yaml 생성
   - BROKERS의 위치 확인 해야 함.
   mdw$ vi ~/jsonload_cfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
   INPUT:
     SOURCE:
        BROKERS: etl:9092
        TOPIC: topic_json_gpkafka
     COLUMNS:
        - NAME: jdata
          TYPE: json
     FORMAT: json
     ERROR_LIMIT: 10
   OUTPUT:
     TABLE: json_from_kafka
     MAPPING:
        - NAME: customer_id
          EXPRESSION: (jdata->>'cust_id')::int
        - NAME: month
          EXPRESSION: (jdata->>'month')::int
        - NAME: amount_paid
          EXPRESSION: (jdata->>'expenses')::decimal
   COMMIT:
     MINIMAL_INTERVAL: 2000

   - Kafka data load job 등록 
   mdw$ gpsscli submit --name kafkajson2gp --gpss-port 50007 ~/jsonload_cfg.yaml

   - GPSS Jobs 모든 리스트 확인 
   mdw$ gpsscli list --all --gpss-port 50007
        JobID           GPHost   GPPort  DataBase  Schema     Table              Topic               Status
        kafkajson2gp    mdw      5432    testdb    public     json_from_kafka    topic_json_gpkafka  JOB_STOPPED

   - GPSS kafkajson2gp job 스타트 
   mdw$ gpsscli start kafkajson2gp --gpss-port 50007
   20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started

   - Greenplum 데이터 적재 확인
   mdw$ psql -d testdb -c "SELECT * FROM public.json_from_kafka;"
 customer_id | month | amount_paid
-------------+-------+-------------
     1313131 |    12 |     1313.13
     1313131 |    10 |      492.83
     1313131 |    11 |      368.27
     3535353 |    11 |      761.35
     3535353 |    10 |     6001.94
     3535353 |    12 |       81.12
     7979797 |    10 |     4489.00
     7979797 |    11 |       18.72
     7979797 |    12 |      173.18
(9 rows)
   
   - GPSS Jobs 중지 및 제거
   mdw$ gpsscli stop kafkajson2gp --gpss-port 50007
   mdw$ gpsscli remove kafkajson2gp --gpss-port 50007



9. GPSS Json 데이터 Merge (Upsert)
  1) Kafka / ETL 서버
    etl$ ssh etl
    
    - 토픽 생성
    etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic customer_orders

    - 토픽 확인
    etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
    customer_orders.   <<<<<<<<< 추가
    topic_for_gpkafka
    topic_json_gpkafka
    
    - 샘플 Json 데이터 생성
    etl$ vi /tmp/sample_customer_data.csv
"1313131","1000.00"
"4444444","99.13"
"1515151","500.05"
"6666666","1.12"
"1717171","3000.03"

   - 토픽에 Json 데이터 적재   
   etl$  /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic customer_orders < /tmp/sample_customer_data.csv

   - Kafka의 데이터 확인
   etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer_orders --from-beginning

  2) Greenplum 서버
   mdw$ ssh mdw
   mdw$ . /usr/local/greenplum-db/greenplum_path.sh

   --재사용하기 때문에, 위에서 한 경우 추가작업 없음.
   mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
    "ListenAddress": {
        "Host": "",
        "Port": 50007,
        "SSL": false
    },
    "Gpfdist": {
        "Host": "",
        "Port": 8319
    }
}

   mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs    #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
   
   --GPSS Start 백그라운드
   mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs &  #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.

   - 테이블 생성 및 데이터 적재
   - 주의 : EOF 뒤에 스페이스가 있으면 에러 발생
   mdw$ psql -d testdb <
DROP TABLE IF EXISTS customer_orders_tbl; 
CREATE TABLE customer_orders_tbl( id int8, amount decimal(9,2) ) DISTRIBUTED BY (id);
INSERT INTO customer_orders_tbl VALUES (1717171, 17.17);
INSERT INTO customer_orders_tbl VALUES (1515151, 15.15);
EOF


   - Yaml 생성
   - BROKERS의 위치 확인 해야 함.
   mdw$ vi ~/custorders_cfg.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: etl:9092
        TOPIC: customer_orders
      COLUMNS:
        - NAME: id
          TYPE: int
        - NAME: order_amount
          TYPE: decimal(9,2)
      FORMAT: csv
      ERROR_LIMIT: 25
   OUTPUT:
      TABLE: customer_orders_tbl
      MODE: MERGE
      MATCH_COLUMNS:
        - id
      UPDATE_COLUMNS:
        - amount
      MAPPING:
        - NAME: id
          EXPRESSION: id
        - NAME: amount
          EXPRESSION: order_amount
   COMMIT:
      MINIMAL_INTERVAL: 2000

   - Kafka data load job 등록 
   mdw$ gpsscli submit --name orders1 --gpss-port 50007 ~/custorders_cfg.yaml

   - GPSS Jobs 모든 리스트 확인 
   mdw$ gpsscli list --all --gpss-port 50007
        JobID           GPHost   GPPort  DataBase  Schema     Table              Topic               Status
        kafkajson2gp    mdw      5432    testdb    public     json_from_kafka    topic_json_gpkafka  JOB_RUNNING
        orders1         mdw      5432    testdb    public     customer_orders_tbl customer_orders    JOB_STOPPED

   - GPSS kafkajson2gp job 스타트 
   mdw$ gpsscli start orders1 --gpss-port 50007
   20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started

   - Greenplum 데이터 적재 확인
   mdw$ psql -d testdb -c "SELECT * FROM customer_orders_tbl ORDER BY id;"
   id    | amount
---------+---------
 1313131 | 1000.00
 1515151 |  500.05     <<<<<<<< 15.15 에서 kafka데이터로 업데이트 됨.
 1717171 | 3000.03     <<<<<<<< 17.17 에서 kafka데이터로 업데이트 됨.
 4444444 |   99.13
 6666666 |    1.12
(5 rows)
   
   - GPSS Jobs 중지 및 제거
   mdw$ gpsscli stop orders1 --gpss-port 50007
   mdw$ gpsscli remove orders1 --gpss-port 50007

###################################################################################################################

10. GPSS Json 데이터 INSERT/UPDATE/DELETE
 1) 데이터 셋 생성 (Greenplum)

 mdw$ psql testdb << EOF
\timing off
\t
\a
\pset fieldsep ,
\o cdcdemo.csv
select i, 'v1', i, i, 'v1', i::char(4) from generate_series(0, 9999) as i;
select i, 'v1', i, i, 'n1', i::char(4) from generate_series(100, 400) as i;
select i, 'v2', i, i, 'v2', i::char(4) from generate_series(0, 9999) as i;
select i, 'v2', i, i, 'n2', i::char(4) from generate_series(100, 400) as i;
select i, 'v3', i, i, 'v3', i::char(4) from generate_series(0, 9999) as i;
select i, 'v3', i, i, 'n3', i::char(4) from generate_series(100, 400) as i;
select i, 'v4', i, i, 'v4', i::char(4) from generate_series(0, 9999) as i;
select i, 'v4', i, i, 'n4', i::char(4) from generate_series(100, 400) as i;
select i, 'v5', i, i, 'v5', i::char(4) from generate_series(0, 9999) as i;
select i, 'v5', i, i, 'n5', i::char(4) from generate_series(100, 400) as i;
select i, 'v3', NULL,NULL,NULL,1 from generate_series(600, 800) as i;
EOF

   - etl 서버에 파일 전송 
 mdw$ scp cdcdemo.csv etl:/tmp

 2) Kafka / ETL 서버
    etl$ ssh etl
    
    - 토픽 생성
    etl$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/greenplum-kafka --replication-factor 1 --partitions 1 --topic cdcdemo

    - 토픽 확인
    etl$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181/greenplum-kafka
    cdcdemo            <<<<<<<<< 추가
    customer_orders    
    topic_for_gpkafka
    topic_json_gpkafka
    
    - 샘플 Json 데이터 생성 (github 에서 다운로드)
    - 제일 마지막 컬럼이 delete f/g (1이면 delete)        
    etl$ head cdcdemo.csv
0,v1,0,0,v1,0,0
1,v1,1,1,v1,1,0
2,v1,2,2,v1,2,0
3,v1,3,3,v1,3,0
4,v1,4,4,v1,4,0
5,v1,5,5,v1,5,0
6,v1,6,6,v1,6,0
7,v1,7,7,v1,7,0
8,v1,8,8,v1,8,0
9,v1,9,9,v1,9,0
    etl$ tail cdcdemo.csv
791,v3,,,,1
792,v3,,,,1
793,v3,,,,1
794,v3,,,,1
795,v3,,,,1
796,v3,,,,1
797,v3,,,,1
798,v3,,,,1
799,v3,,,,1
800,v3,,,,1

   - 토픽에 Json 데이터 적재   
   etl$  /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cdcdemo < /tmp/cdcdemo/cdcdemo.csv

   - Kafka의 데이터 확인
   etl$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdcdemo --from-beginning 

  2) Greenplum 서버
   mdw$ ssh mdw
   mdw$ . /usr/local/greenplum-db/greenplum_path.sh

   --재사용하기 때문에, 위에서 한 경우 추가작업 없음.
   mdw$ vi $MASTER_DATA_DIRECTORY/gpsscfg_ex.json
{
    "ListenAddress": {
        "Host": "",
        "Port": 50007,
        "SSL": false
    },
    "Gpfdist": {
        "Host": "",
        "Port": 8319
    }
}

   mdw$ mkdir -p $MASTER_DATA_DIRECTORY/gpsslogs    #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.
   
   --GPSS Start 백그라운드
   mdw$ gpss $MASTER_DATA_DIRECTORY/gpsscfg_ex.json --log-dir $MASTER_DATA_DIRECTORY/gpsslogs &  #<<<<<<<<<<< 재사용하기 때문에, 위에서 한 경우 추가작업 없음.

   - 테이블 생성 및 데이터 적재
   - 주의 : EOF 뒤에 스페이스가 있으면 에러 발생
   mdw$ psql -d testdb <
drop table if exists public.testcdc;
create table public.testcdc (k1 int, k2 char(5), v1 int, v2 int, v3 char(2), v4 char(4), mark_del char) DISTRIBUTED BY (k1, k2);
EOF


   - Yaml 생성
   - BROKERS의 위치 확인 해야 함.
   mdw$ vi ~/cdcdemo.yaml
DATABASE: testdb
USER: gpadmin
HOST: mdw
PORT: 5432
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: etl:9092
        TOPIC: cdcdemo
      COLUMNS:
        - NAME: e_k1
          TYPE: int
        - NAME: e_k2
          TYPE: char(5)
        - NAME: e_v1
          TYPE: int
        - NAME: e_v2
          TYPE: int
        - NAME: e_v3
          TYPE: char(2)
        - NAME: e_v4
          TYPE: char(5)
        - NAME: e_v5
          TYPE: char
      FORMAT: csv
      ERROR_LIMIT: 100
   OUTPUT:
      SCHEMA: public
      TABLE: testcdc
      MODE: merge
      MATCH_COLUMNS:
        - k1
        - k2
      UPDATE_COLUMNS:
        - v1
        - v2
        - v3
        - v4
        - mark_del
      MAPPING:
        - NAME: k1
          EXPRESSION: e_k1
        - NAME: k2
          EXPRESSION: e_k2
        - NAME: v1
          EXPRESSION: e_v1
        - NAME: v2
          EXPRESSION: e_v2
        - NAME: v3
          EXPRESSION: e_v3
        - NAME: v4
          EXPRESSION: e_v4
        - NAME: mark_del
          EXPRESSION: e_v5
   COMMIT:
      MINIMAL_INTERVAL: 2000
   TASK:
      POST_BATCH_SQL: delete from public.testcdc where mark_del = '1';
      BATCH_INTERVAL: 1


   - Kafka data load job 등록 
   mdw$ gpsscli submit --name cdcdemo --gpss-port 50007 ~/cdcdemo.yaml

   - GPSS Jobs 모든 리스트 확인 
   mdw$ gpsscli list --all --gpss-port 50007
        JobID           GPHost   GPPort  DataBase  Schema     Table              Topic               Status
        kafkajson2gp    mdw      5432    testdb    public     json_from_kafka    topic_json_gpkafka  JOB_RUNNING
        orders1         mdw      5432    testdb    public     customer_orders_tbl customer_orders    JOB_RUNNING
        cdcdemo         mdw      5432    testdb    public     testcdc             cdcdemo            JOB_STOPPED <<< 추가


   - Greenplum 데이터 0건 확인
   mdw$ psql -d testdb -c "SELECT * FROM public.testcdc ORDER BY k1,k2;"


   - GPSS cdcdemo job 스타트 
   mdw$ gpsscli start cdcdemo --gpss-port 50007
   20200402:10:46:21 gpsscli:gpadmin:mdw:030066-[INFO]:-JobID: kafkajson2gp is started


   - Greenplum 데이터 적재 확인
   mdw$ psql -d testdb -c "SELECT * FROM public.testcdc ORDER BY k1,k2;"
   mdw$ psql -d testdb -c "SELECT count(*) FROM public.testcdc ;"
   
   - GPSS Jobs 중지 및 제거
   mdw$ gpsscli stop cdcdemo --gpss-port 50007
   mdw$ gpsscli remove cdcdemo --gpss-port 50007



11. 기타
   1) gpss 데몬이 kill 될 경우 submit를 다시 해야 함.
   2) 타겟 테이블에 truncate 할 경우 테이블 락 발생, Delete all은 상관 없음.


12.  에러 처리
   1) 컬럼 개수가 맞지 않을 경우 

      에러 메시지 확인 - 컬럼
    
      20200402:15:28:37 gpss:gpadmin:mdw:057928-[ERROR]:-Failed to execute batch: pq: segment reject limit reached, aborting operation  (seg2 slice1 172.16.25.134:6000 pid=15976)
      Use following query to access the detailed error:
      SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
          FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09'
      
      -- 에러 row 쿼리 조회
      [gpadmin@mdw ~]$
      testdb=# SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
      testdb-# FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09';
                  cmdtime            |             errmsg             |          rawdata
      -------------------------------+--------------------------------+---------------------------
       2020-04-02 15:28:37.67138+09  | missing data for column "e_v5" | 1200,v1,1200,1200,v1,1200
       2020-04-02 15:28:37.67138+09  | missing data for column "e_v5" | 1201,v1,1201,1201,v1,1201
       2020-04-02 15:28:37.67138+09  | missing data for column "e_v5" | 1202,v1,1202,1202,v1,1202
      
      -- 에러 row 정리
      testdb=# select gp_truncate_error_log ('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"');
       gp_truncate_error_log
      -----------------------
       t
      (1 row)

      Time: 26.875 ms

      --에러 row 삭제 확인
      testdb=# SELECT cmdtime, errmsg, COALESCE(rawdata, encode(rawbytes, 'escape')) AS rawdata
      FROM gp_read_error_log('"public"."gpkafkaloadext_6f2aafd95b6fcd4367ef91b66d0412bb"') WHERE cmdtime >= '2020-04-02 15:28:37.639269+09' AND cmdtime <= '2020-04-02 15:28:37.929376+09';
       cmdtime | errmsg | rawdata
      ---------+--------+---------
      (0 rows)

      Time: 4.795 ms
      testdb=#

       --토픽 리셋 정리
       $ gpkafka load --force-reset-earliest ~/cdcdemo.yaml
       $ gpkafka load --help 
          Usage:
            gpkafka load [flags]

          Flags:
                --config string                  gpss json configuration file. it's preferred to be set.
                --debug-port string              enable pprof debug server at specified port
            -f, --force                          force load config when submitting a job which is already running
                --force-reset-earliest           continue to load from earliest available message
                --force-reset-latest             continue to load from new messages
                --force-reset-timestamp string   continue to load from offset corresponding to the input timestamp, the timestamp should be millisecond format
                --gpfdist-host string            gpfdist host address. it will override the value of gpss config.
                --gpfdist-port int32             gpfdist host port. it will override the value of gpss config.
            -h, --help                           help for load
                --name string                    job's name
                --partition                      display progress info by partition
                --quit-at-eof                    quit after reading kafka EOF

          Global Flags:
            -l, --log-dir string   log directory, default is $HOME/gpAdminLogs
                --verbose          enable debug log





######### 참고 
      -- 토픽 메시지 삭제 
 $ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181/greenplum-kafka --topic cdcdemo

 --토픽 메시지 삭제 확인
 $ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdcdemo --from-beginning 

댓글 없음:

댓글 쓰기

Greenplum 6 마스터 Port 변경

Greenplum 마스터 노드 Port 변경 - 이미 설치된 Greenplum 클러스터에서 마스터 port 변경 절차 - https://knowledge.broadcom.com/ external /article?articleNumber= 296...