介绍

  • 历史拉链表

    • 历史拉链表是一种数据模型,主要针对数据仓库设计中表存储数据的方式而定义的。它记录一个事物从开始到当前状态的所有变化的信息。拉链表可以避免按每一天存储所有记录造成的海量存储问题,同时也是处理缓慢变化数据的一种常见方式。
    • 也就是说,对于表中的任何数据,不进行真正的删除,只记录操作和有效日期。
  • 流程

    • history1
    • 其中,tmp0表有两个分区,表示历史数据和当前数据,使用tmp1tmp0和事实表进行更新和交换。

操作

创建表

  • 首先需要创建delta表,事实表,以及两个tmp表

  • -- 事实表
    create table public.member_fatdt0
    (
        member_id varchar(64),         -- 会员ID
        phoneno varchar(20),           -- 电话号码
        dw_beg_date date,              -- 生效日期
        dw_end_date date,              -- 失效日期
        dtype char(1),                 -- 类型(历史数据,当前数据)
        dw_status char(1),             -- 数据操作类型(I,D,U)
        dw_ins_date date               -- 数据仓库插入日期
    )with(appendonly=true,compresslevel=5)   -- 压缩级别
    distributed by (member_id)
    PARTITION BY RANGE (dw_end_date)
    (
        PARTITION p20111201 START (date '2011-12-01') INCLUSIVE,
        PARTITION p20111202 START (date '2011-12-02') INCLUSIVE,
        PARTITION p20111203 START (date '2011-12-03') INCLUSIVE,
        PARTITION p20111204 START (date '2011-12-04') INCLUSIVE,
        PARTITION p20111205 START (date '2011-12-05') INCLUSIVE,
        PARTITION p20111206 START (date '2011-12-06') INCLUSIVE,
        PARTITION p20111207 START (date '2011-12-07') INCLUSIVE,
        PARTITION p20111231 START (date '2011-12-31') INCLUSIVE
        END (date '3001-01-01') EXCLUSIVE
    );
    
    -- 增量表
    create table public.member_delta
    (
        member_id varchar(64),
        phoneno varchar(20),
        action char(1),                      -- 数据操作类型(I,D,U)
        dw_ins_date date                     -- 类型(新增,删除,更新)
    )with(appendonly=true,compresslevel=5)   -- 压缩级别
    distributed by (member_id)
    
    -- 临时表
    create table public.member_tmp0
    (
        member_id varchar(64),
        phoneno varchar(20),
        dw_beg_date date,
        dw_end_date date,
        dtype char(1),
        dw_status char(1),
        dw_ins_date date
    )with(appendonly=true,compresslevel=5)   -- 压缩级别
    distributed by (member_id)
    PARTITION BY LIST (dtype)
    (
        PARTITION PHIS VALUES ('H'),      -- 表示历史信息
        PARTITION PCUR VALUES ('C'),      -- 表示当前信息
        DEFAULT PARTITION other
    );
      -- 临时表1
    
      create table public.member_tmp1
      (
          member_id varchar(64),
          phoneno varchar(20),
          dw_beg_date date,
          dw_end_date date,
          dtype char(1),
          dw_status char(1),
          dw_ins_date date
      )with(appendonly=true,compresslevel=5)   -- 压缩级别
      distributed by (member_id)
    

history2

插入数据

-- 插入delta表
Insert into member_delta values('mem006','1310000006','I','2011-12-03');
Insert into member_delta values('mem002','1310000002','D','2011-12-03');
Insert into member_delta values('mem003','1310000003','U','2011-12-03');
  • history3
  • 插入事实表

    • Insert into member_fatdt0 values('mem001','1310000001','2011-12-01','3000-12-31','C','I','2011-12-02');
      Insert into member_fatdt0 values('mem002','1310000002','2011-12-01','3000-12-31','C','I','2011-12-02');
      Insert into member_fatdt0 values('mem003','1310000003','2011-12-01','3000-12-31','C','I','2011-12-02');
      Insert into member_fatdt0 values('mem004','1310000004','2011-12-01','3000-12-31','C','I','2011-12-02');
      Insert into member_fatdt0 values('mem004','1310000004','2011-12-01','3000-12-31','C','I','2011-12-02');
      Insert into member_fatdt0 values('mem005','1310000005','2011-12-01','3000-12-31','C','I','2011-12-02');
      
    • history4

数据刷新

  1. 将member_fatdt0表与member_delta左外连接,相关联的历史数据插入到member_tmp0历史分区,反之插入到member_tmp0的当前分区

    • 这里主要处理update和delete操作,若能进行左连接,说明数据有更新,插入历史分区

    • truncate table public.member_tmp0;
      -- 清理临时表
      insert into public.member_tmp0
      (
          member_id,
          phoneno,
          dw_beg_date,
          dw_end_date,
          dtype,
          dw_status,
          dw_ins_date
      )
      select a.member_id,a.phoneno,a.dw_beg_date,
              case when b.member_id is null then a.dw_end_date
              else date '2011-12-02'
              end as dw_end_date,
              case when b.member_id is null then 'C'
              else 'H'
              end as dtype,
              case when b.member_id is null then a.dw_status
              else b.action
              end as dw_status,
              date '2011-12-03'
      from public.member_fatdt0 a
      left join public.member_delta b 
      on a.member_id = b.member_id
      and b.action in('D','U')
      where a.dw_beg_date <= cast('2011-12-02' as date) -1
      and a.dw_end_date > cast('2011-12-02' as date)-1;
      
  • history5
  1. 将member_delta当前数据(更新,插入的新数据)插入到member_tmp0当前分区,end时间为无穷。

    • insert into public.member_tmp0
      (
          member_id,
          phoneno,
          dw_beg_date,
          dw_end_date,
          dtype,
          dw_status,
          dw_ins_date
      )
      select member_id,phoneno,
          cast('2011-12-02' as date),
          cast('3000-12-31' as date),
          'C',
          action,
          cast('2011-12-03' as date)
      from public.member_delta
      where action in ('I','U');
      
  • history6
  1. 将member_tmp0历史数据与member_fatdt0相应分区交换(通过member_tmp1表进行交换)。

    • alter table  member_tmp1 drop constraint member_tmp0_1_prt_phis_check;
      truncate table public.member_tmp1;
      alter table public.member_tmp0 exchange partition for ('H') with table public.member_tmp1;
      alter table public.member_fatdt0 exchange partition for('2011-12-02') with table public.member_tmp1;
      
      • history7
  2. 将member_tmp0当前数据与member_fatdt0相应分区交换(通过member_tmp1表进行交换)。

    • alter table  member_tmp1 drop constraint member_fatdt0_1_prt_p20111202_check
      alter table  member_tmp1 drop constraint member_tmp0_1_prt_pcur_check
      alter table public.member_tmp0 exchange partition for('C') with table public.member_tmp1;
      alter table public.member_fatdt0 exchange partition for('3000-12-31') with table public.member_tmp1;
      
    • history8

results matching ""

    No results matching ""