레이블이 GPDB7_프로시져인 게시물을 표시합니다. 모든 게시물 표시
레이블이 GPDB7_프로시져인 게시물을 표시합니다. 모든 게시물 표시

2024년 8월 19일 월요일

Greenplum 7 프로시저 및 upsert

 1. Greenplum 7 프로시저

Greenplum 7에서는 프로시저를 지원, Greenplum 6까지는 Function으로 프로시저를 지원하였음.

배치 업무를 위한 함수와 프로시저의 차이점

1) 함수로 프로시저를 만들 경우

- 함수내에서 COMMIT; Rollback을 사용할 수 없음.

- 즉, 함수로 프로시저를 수행하는 동안 어디까지 진행되는지를 job 이력 테이블로는 확인이 불가능

2) 프로시저를 이용하는 경우 - ONLY Greenplum 7

- 프로시저 중간에 COMMIT;을 이용하여 현재 진행상황을 Job 이력 테이블에 UPDATE 하여, 진행상황을 알 수 있음.

3) Greenplum 6에서 배치 함수의 진행상황 확인하는 방법

- 함수내에서 NOTICE를 이용하여, 메시지로 확인이 가능하였음.


2. Greenplum 7의 Upsert

Greenplum 7에서는 INSERT시 키 중복이 될 경우 update를 지원

아래 예제에서는 job 이력 로그에 job이름과 job 실행일자가 유니크하고,

재수행할 경우 업데이트로 수행 함.

INSERT INTO TABLE xxxx

VALUES ()

ON CONFLICT (키값) DO UPDATE

SET



--테스트 테이블

DROP TABLE IF EXISTS public.order_log;

CREATE TABLE public.order_log

(

order_no int,

cust_no int,

prod_nm varchar(20),

order_dt varchar(8)

)

WITH (appendonly=TRUE, compresstype=zstd, compresslevel=7)

DISTRIBUTED BY (order_no)

PARTITION BY RANGE (order_dt)

(

PARTITION p2001 start('20010101') END ('20020101'),

PARTITION p2002 start('20020101') END ('20030101'),

PARTITION p2003 start('20030101') END ('20040101'),

PARTITION p2004 start('20040101') END ('20050101'),

PARTITION p2005 start('20050101') END ('20060101')

)

;


--Job 수행 이력 로깅 테이블

DROP TABLE IF EXISTS public.tb_sp_job_log;

CREATE TABLE public.tb_sp_job_log (

job_nm varchar(63),

job_base_dt varchar(8),

job_start_ts timestamp,

job_end_ts timestamp,

rows bigint,

job_status char(1), --R:Running, S:Success, F:Failure

query text,

err_msg text

)

DISTRIBUTED BY (job_nm, job_base_dt);


CREATE UNIQUE INDEX ixu_tb_sp_job_log ON public.tb_sp_job_log(job_nm, job_base_dt);


-- Job 수행 프로시저

DROP PROCEDURE public.sp_test_upsert(v_date IN varchar(8)) ;

CREATE OR REPLACE PROCEDURE public.sp_test_upsert(v_date IN varchar(8))

LANGUAGE plpgsql

AS

$$

DECLARE

v_tmp text;

v_job_nm text;

v_rows integer;

v_sql text;


v_err_msg text;

v_err_cd TEXT;

v_err_context TEXT;

BEGIN

--현재 프로시져 명을 추출

GET DIAGNOSTICS v_tmp = PG_CONTEXT;

v_job_nm := split_part((substring(v_tmp from 'function (.*?) line'))::regprocedure::text, '(', 1);

--select usename, sess_id into v_usr, v_ssid from pg_stat_activity where pid = pg_backend_pid();

-- 처음 작업할때에는 insert

-- 키 값이 중복될 경우 update 수행 (키 값은 job_nm, job_base_dt)

INSERT INTO public.tb_sp_job_log AS t

(job_nm, job_base_dt, job_start_ts, job_end_ts, "rows", job_status, query, err_msg)

VALUES (v_job_nm, v_date, clock_timestamp(), NULL, NULL, 'R', NULL, NULL)

ON CONFLICT (job_nm, job_base_dt) DO UPDATE

SET job_start_ts = clock_timestamp()

, job_end_ts = NULL

, ROWS = NULL

, job_status = 'R'

, query = NULL

, err_msg = NULL

;

COMMIT; -- Job이 수행되는 동안 'R', Running으로 변경 - 다른 세션에서 public.tb_sp_job_log 상태 확인


BEGIN

---------------- Job Logic Start -------------------

DELETE FROM public.order_log

WHERE order_dt = v_date;

v_sql = '

INSERT INTO public.order_log

(order_no, cust_no, prod_nm, order_dt)

VALUES(0, 0, ''prod_01'', '''||v_date||''' ) ';

EXECUTE v_sql;

GET DIAGNOSTICS v_rows := row_count; -- 처리 건수

SELECT pg_sleep(5) INTO v_tmp; --상태 확인을 위하여 5초 sleep 적용

---------------- Job Logic End -------------------

UPDATE public.tb_sp_job_log

SET job_end_ts=clock_timestamp(), ROWS=v_rows

, job_status = 'S', query = v_sql, err_msg=NULL

WHERE job_nm = v_job_nm

AND job_base_dt = v_date;

EXCEPTION

WHEN OTHERS THEN

GET stacked DIAGNOSTICS

v_err_cd = returned_sqlstate,

v_err_msg = message_text,

v_err_context = pg_exception_context;

RAISE NOTICE E'Got exception:

err_cd : %

err_msg : %

err_context: %', v_err_cd, v_err_msg, v_err_context;

UPDATE public.tb_sp_job_log

SET job_end_ts=clock_timestamp(), ROWS=NULL, job_status = 'F'

, query = v_sql, err_msg=v_err_msg

WHERE job_nm = v_job_nm

AND job_base_dt = v_date;

END;


END

$$

;

------ 초기 수행 시 ------

--수행전 로그 삭제

TRUNCATE TABLE public.order_log;

TRUNCATE TABLE public.tb_sp_job_log;


-- 프로시저 Job 수행

CALL public.sp_test_upsert('20010101');


-- 수행되는 동안 Job 실행 확인(현재 소스상에는 5초로 설정되어 있어서 5초 안에 다른 창에서 확인 가능)

SELECT * FROM public.tb_sp_job_log;

job_nm |job_base_dt|job_start_ts |job_end_ts|rows|job_status|query|err_msg|

--------------+-----------+-----------------------+----------+----+----------+-----+-------+

sp_test_upsert|20010101 |2024-08-15 05:47:08.489| | |R | | |


-- Job 수행 완료시 실행결과 확인

SELECT * FROM public.tb_sp_job_log;

job_nm |job_base_dt|job_start_ts |job_end_ts |rows|job_status|query |err_msg|

--------------+-----------+-----------------------+-----------------------+----+----------+---------------------------------+-------+

sp_test_upsert|20010101 |2024-08-15 05:47:08.489|2024-08-15 05:48:05.169| 1|S |¶INSERT INTO public.order_log... | |


--적재 작업 확인

SELECT * FROM public.order_log;

order_no|cust_no|prod_nm|order_dt|

--------+-------+-------+--------+

0| 0|prod_01|20010101|


------ 재작 수행 시 ------

-- 프로시저 Job 수행

CALL public.sp_test_upsert('20010101');


-- Job 수행 완료시 실행결과 확인 -- 기존 데이터에서 update 됨.

SELECT * FROM public.tb_sp_job_log;

job_nm |job_base_dt|job_start_ts |job_end_ts |rows|job_status|query |err_msg|

--------------+-----------+-----------------------+-----------------------+----+----------+---------------------------------+-------+

sp_test_upsert|20010101 |2024-08-15 06:06:31.139|2024-08-15 06:06:36.170| 1|S |¶INSERT INTO public.order_log... | |


2024년 8월 18일 일요일

Greenplum 7에서 프로시져 함수 및 프로시져 트랜잭션 처리

Greenplum 6에서는 프로시저를 위해서 함수 형태로 지원하지만,

Greenplum 7에서는 프로시저 형태의 함수 및 프로시저 둘다 지원


--1.테스트를 위한 테이블 생성

DROP TABLE IF EXISTS public.test_trx;


CREATE TABLE public.test_trx (id int)

DISTRIBUTED BY (id);


--Unique 에러 발생을 위하여 Unique Index 생

CREATE UNIQUE INDEX ixu_test_trx ON public.test_trx(id);


DROP TABLE IF EXISTS public.test_job_log;

CREATE TABLE public.test_job_log (id int, job_flag TEXT, err_msg text)

DISTRIBUTED BY (id);


--2.1 테스트용 프로시저 함수 - (All or Nothing)

--프로시저 수행시 에러 발생시 전체 롤백 케이스


DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;


CREATE OR REPLACE FUNCTION control_trasaction()

RETURNS TEXT

AS

$$

DECLARE

v_err_msg text;

v_err_cd TEXT;

v_err_context TEXT;

BEGIN

DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;

INSERT INTO public.test_trx values(1);

INSERT INTO public.test_trx values(1); --에러 강제 발생

INSERT INTO public.test_job_log VALUES (1, 'S', NULL);

RETURN 'OK';


EXCEPTION

WHEN OTHERS THEN

GET stacked DIAGNOSTICS

v_err_cd = returned_sqlstate,

v_err_msg = message_text,

v_err_context = pg_exception_context;

RAISE NOTICE E'Got exception:

err_cd : %

err_msg : %

err_context: %', v_err_cd, v_err_msg, v_err_context;

INSERT INTO public.test_job_log VALUES (1, 'F', v_err_msg);

RETURN 'ERROR';

END

$$

LANGUAGE plpgsql ;


--프로시저 함수 호출

SELECT control_trasaction();


--Output 메시지

PL/pgSQL function control_trasaction() line 12 at SQL statement

Got exception:

err_cd : 23505

err_msg : duplicate key value violates unique constraint "ixu_test_trx"

(seg1 172.16.65.90:6001 pid=21436)

err_context: SQL statement "INSERT INTO public.test_trx values(1)"

PL/pgSQL function control_trasaction() line 10 at SQL statement


--데이터 중복 건으로 Unique 에러로 인하여, 전체 롤백 됨.

--에러로 인하여 트랜잭션이 롤백되고, 0건 입력

SELECT * FROM public.test_trx ORDER BY id;

id|

--+


--예외 처리로 에러 원인 적재

SELECT * FROM public.test_job_log;

id|job_flag|err_msg |

--+--------+--------------------------------------------------------------+

1|F |duplicate key value violates unique constraint "ixu_test_trx"

(seg1 172.16.65.90:6001 pid=21436)|


--2.2. 트랜잭션 부분 처리

--여러 단계의 작업 처리시, 에러 발생하더라도 특정 구간까지 수행한 작업이 완료되어야 할 경우

--서브 트랜잭션으로 단위 작업까지 완료가 필요한 경우, 트랜잭션 Block(Begin/END)으로 묶음.

DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;

CREATE OR REPLACE function control_trasaction()

RETURNS TEXT

AS

$$

DECLARE

v_err_msg text;

v_err_cd TEXT;

v_err_context TEXT;

BEGIN


BEGIN

DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;

INSERT INTO public.test_trx values(1);

END;

BEGIN

INSERT INTO public.test_trx values(1);

INSERT INTO public.test_job_log VALUES (1, 'S', NULL);

RETURN 'OK';

EXCEPTION

WHEN OTHERS THEN

GET stacked DIAGNOSTICS

v_err_cd = returned_sqlstate,

v_err_msg = message_text,

v_err_context = pg_exception_context;

RAISE NOTICE E'Got exception:

err_cd : %

err_msg : %

err_context: %', v_err_cd, v_err_msg, v_err_context;


INSERT INTO public.test_job_log VALUES (1, 'F', v_err_msg);

RETURN 'ERROR';

END;


END

$$

LANGUAGE plpgsql ;


--프로시저 함수 호출

SELECT control_trasaction();

--Output 메시지

Got exception:

err_cd : 23505

err_msg : duplicate key value violates unique constraint "ixu_test_trx"

(seg1 172.16.65.90:6001 pid=21436)

err_context: SQL statement "INSERT INTO public.test_trx values(1)"

PL/pgSQL function control_trasaction() line 15 at SQL STATEMENT

--SELECT control_trasaction(); 수행 결과

control_trasaction|

------------------+

ERROR |


--서브 트랜잭션 Block까지 처리되고, 이후에 에러 발생

--첫번째 트랜잭션 Block까지 처리 됨.

SELECT * FROM public.test_trx ORDER BY id;

id|

--+

1|


--2번째 트랜잭션 Block에서 Unique 에러 발생하여 예외 처리 적용 됨.

SELECT * FROM public.test_job_log;

id|job_flag|err_msg |

--+--------+--------------------------------------------------------------+

1|F |duplicate key value violates unique constraint "ixu_test_trx"

(seg4 172.16.65.133:6000 pid=1937134)|



--2.3. 트랜잭션 부분 처리를 이용한 정상적인 처리

--각 트랜잭션 블락을 처리하고, 예외 처리없이 정상 수행

DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;

CREATE OR REPLACE FUNCTION control_trasaction()

RETURNS TEXT

AS

$$

DECLARE

v_err_msg text;

v_err_cd TEXT;

v_err_context TEXT;

BEGIN


BEGIN

DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;

INSERT INTO public.test_trx values(1);

END;

BEGIN

INSERT INTO public.test_trx values(2);

INSERT INTO public.test_job_log VALUES (2, 'S', NULL);

RETURN 'OK';

EXCEPTION

WHEN OTHERS THEN

GET stacked DIAGNOSTICS

v_err_cd = returned_sqlstate,

v_err_msg = message_text,

v_err_context = pg_exception_context;

RAISE NOTICE E'Got exception:

err_cd : %

err_msg : %

err_context: %', v_err_cd, v_err_msg, v_err_context;


INSERT INTO public.test_job_log VALUES (2, 'F', v_err_msg);

RETURN 'ERROR';

END;

END

$$

LANGUAGE plpgsql;


--프로시저 함수 호출 -> 정상 처리

SELECT control_trasaction();

control_trasaction|

------------------+

OK |


--정상으로 처리 됨.

SELECT * FROM public.test_trx ORDER BY id;

id|

--+

1|

2|


--Job 로그도 정상처리

SELECT * FROM public.test_job_log;

id|job_flag|err_msg|

--+--------+-------+

2|S | |

--3.Greenplum 7에서의 프로시저 테스트

--프로시저 수행시 리턴값이 없음.


DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;

CREATE OR REPLACE PROCEDURE control_trasaction()

LANGUAGE plpgsql

AS

$$

DECLARE

v_err_msg text;

v_err_cd TEXT;

v_err_context TEXT;

BEGIN


BEGIN

DELETE FROM public.test_trx;

DELETE FROM public.test_job_log;

INSERT INTO public.test_trx values(1);

END;

BEGIN

INSERT INTO public.test_trx values(2);

INSERT INTO public.test_job_log VALUES (2, 'S', NULL);

EXCEPTION

WHEN OTHERS THEN

GET stacked DIAGNOSTICS

v_err_cd = returned_sqlstate,

v_err_msg = message_text,

v_err_context = pg_exception_context;

RAISE NOTICE E'Got exception:

err_cd : %

err_msg : %

err_context: %', v_err_cd, v_err_msg, v_err_context;


INSERT INTO public.test_job_log VALUES (2, 'F', v_err_msg);

END;

END

$$;


--프로시저 수행시 select 프로시저가 아닌, call 프로시저로 수행

SELECT control_trasaction();

SQL Error [42809]: ERROR: control_trasaction() is a procedure

Hint: To call a procedure, use CALL.

Position: 8


CALL control_trasaction();

-- 정상 처리

SELECT * FROM public.test_trx ORDER BY id;

id|

--+

1|

2|


-- job 로그 정상 처리

SELECT * FROM public.test_job_log;

id|job_flag|err_msg|

--+--------+-------+

2|S | |

Greenplum 7.5에서 Analyze 기능개선

1. analyze 개선 사항 - Analyze 수행 후 테이블에 변경이 없을 경우, Analyze를 자동 skip - 개선 버전: Greenplum 7.5 + 2. Greenplum 버전별 analyzedb, analyze 의 sk...