Join our FREE personalized newsletter for news, trends, and insights that matter to everyone in America

Newsletter
New

How Can I Fix The Opening/closing Of Date Ranges

Card image cap

I have to adapt the rfm metrics logic to bank products, in this case deposits
The business logic explained to me is such:

recency - last date of a positive balance on any of the deposit accounts of a customer

frequency - culminative number of all deposit accounts of a customer

monetary - sum of max balances of all deposit accounts of a customer

the rules for new row inserts and updates:
a new date range start whenever a customer goes from zero balance to positive
lets say I have a customer who had positive balances from 2020-01-08 untill 2020-05-09

on 2020-05-09 all of its deposits balances became 0

he will have an open ended date range [2020-01-08) with recency 2020-05-09
this date range will only close and a new one will be inserted only when he once again comes back from 0

I have been at it for several days, but I just cannot comprehend how can I make it work...

Here is the code I have so far:

-- DROP PROCEDURE feature_store.rfm_deposits_load_inc(date);  
  
CREATE OR REPLACE PROCEDURE feature_store.rfm_deposits_load_inc(IN load_date date)  
 LANGUAGE plpgsql  
AS $procedure$  
DECLARE  
    prev_load_date date := load_date - interval '1 day';  
    insert_cols text;  
    hash_expr text;  
    result_sql text;  
BEGIN  
    -- Step 1: Rollback if already processed  
    if exists (  
        select 1  
        from feature_store.rfm_deposits  
        where date_range >> daterange(prev_load_date, load_date)  
    ) then  
raise notice 'Already calculated for this load_date, rolling back...';  
  
        delete from feature_store.rfm_deposits  
        where date_range >> daterange(prev_load_date, load_date);  
  
        update feature_store.rfm_deposits  
        set date_range = daterange(lower(date_range), null)  
        where prev_load_date <@ date_range and date_range << daterange('9999-12-31'::date, null);  
  
        raise notice 'Rollback complete';  
    end if;  
  
    DROP TABLE IF EXISTS temp_today_balances;  
    CREATE TEMP TABLE temp_today_balances AS  
    SELECT  
        d.customer_hub_id,  
        d.customer_id,  
        d.main_account_hub_id AS account_hub_id,  
        MAX(ab.sumn) AS balance_today  
    FROM l2.deposits d  
    JOIN l2.accounts_balances ab  
      ON ab.account_hub_id = d.main_account_hub_id  
     AND load_date >= lower(ab.date_range)  
     AND (upper(ab.date_range) IS NULL OR load_date < upper(ab.date_range))  
    INNER JOIN l1_dbo.hs_customer_onlinebankdb_active c   
      ON c.hub_id = d.customer_hub_id  
    WHERE d.open_date >= '2020-01-01'   
      AND c.customer_type_id = 1    
    GROUP BY d.customer_hub_id, d.customer_id, d.main_account_hub_id;  
  
    CREATE INDEX ON temp_today_balances(customer_hub_id);  
    ANALYZE temp_today_balances;  
  
  
    DROP TABLE IF EXISTS temp_customer_agg;  
    CREATE TEMP TABLE temp_customer_agg AS  
    SELECT  
        customer_hub_id,  
        MAX(customer_id) AS customer_id,  
        SUM(balance_today) AS total_balance,  
        COUNT(DISTINCT account_hub_id) AS total_accounts,  
        BOOL_OR(balance_today > 0) AS is_active  
    FROM temp_today_balances  
    GROUP BY customer_hub_id;  
  
    CREATE INDEX ON temp_customer_agg(customer_hub_id);  
    ANALYZE temp_customer_agg;  
  
    DROP TABLE IF EXISTS temp_prev_state;  
    CREATE TEMP TABLE temp_prev_state AS  
    SELECT  
        r.customer_hub_id,  
        r.customer_id,  
        r.deposits_recency,  
        r.deposits_frequency,  
        r.deposits_monetary,  
        r.last_max_balance,  
        lower(r.date_range) AS range_start,  
        upper(r.date_range) AS range_end  
    FROM feature_store.rfm_deposits r  
    WHERE prev_load_date >= lower(r.date_range)  
      AND (upper(r.date_range) IS NULL OR prev_load_date < upper(r.date_range));  
  
    CREATE INDEX ON temp_prev_state(customer_hub_id);  
    ANALYZE temp_prev_state;  
  
    DROP TABLE IF EXISTS temp_kv;  
    CREATE TEMP TABLE temp_kv AS  
    SELECT  
        COALESCE(p.customer_hub_id, t.customer_hub_id) AS customer_hub_id,  
        COALESCE(p.account_hub_id, t.account_hub_id) AS account_hub_id,  
        GREATEST(  
            COALESCE((p.value)::numeric, 0),  
            COALESCE(t.balance_today, 0)  
        ) AS max_balance  
    FROM (  
        SELECT ps.customer_hub_id, kv.key::bigint AS account_hub_id, kv.value  
        FROM temp_prev_state ps  
        LEFT JOIN LATERAL jsonb_each_text(COALESCE(ps.last_max_balance, '{}'::jsonb)) kv ON true  
    ) p  
    FULL JOIN temp_today_balances t  
      ON p.customer_hub_id = t.customer_hub_id  
     AND p.account_hub_id = t.account_hub_id;  
  
    DROP TABLE IF EXISTS temp_monetary;  
    CREATE TEMP TABLE temp_monetary AS  
    SELECT  
        customer_hub_id,  
        jsonb_object_agg(account_hub_id::text, max_balance) AS last_max_balance_new,  
        SUM(max_balance) AS deposits_monetary_new  
    FROM temp_kv  
    GROUP BY customer_hub_id;  
  
    CREATE INDEX ON temp_monetary(customer_hub_id);  
    ANALYZE temp_monetary;  
  
    DROP TABLE IF EXISTS temp_transitions;  
    CREATE TEMP TABLE temp_transitions AS  
    SELECT  
        COALESCE(a.customer_hub_id, p.customer_hub_id) AS customer_hub_id,  
        COALESCE(a.customer_id, p.customer_id) AS customer_id,  
        COALESCE(a.total_accounts, p.deposits_frequency, 0) AS deposits_frequency_new,  
        COALESCE(m.deposits_monetary_new, p.deposits_monetary, 0) AS deposits_monetary_new,  
        COALESCE(m.last_max_balance_new, p.last_max_balance, '{}'::jsonb) AS last_max_balance_new,  
        a.is_active AS active_today,  
        p.deposits_recency AS prev_recency,  
        CASE  
          WHEN COALESCE(a.is_active, FALSE) THEN load_date  
          ELSE p.deposits_recency  
        END AS deposits_recency_new  
    FROM temp_prev_state p  
    FULL JOIN temp_customer_agg a ON a.customer_hub_id = p.customer_hub_id  
    LEFT JOIN temp_monetary m ON m.customer_hub_id = COALESCE(a.customer_hub_id, p.customer_hub_id);  
  
    CREATE INDEX ON temp_transitions(customer_hub_id);  
    ANALYZE temp_transitions;  
  
    UPDATE feature_store.rfm_deposits t  
    SET deposits_recency = tr.deposits_recency_new,  
        deposits_monetary = tr.deposits_monetary_new,  
        deposits_frequency = tr.deposits_frequency_new,  
        last_max_balance = tr.last_max_balance_new,  
        last_updated_dwh = now()  
    FROM temp_transitions tr  
    WHERE prev_load_date >= lower(t.date_range)  
      AND (upper(t.date_range) IS NULL OR prev_load_date < upper(t.date_range))  
      AND t.customer_hub_id = tr.customer_hub_id;  
  
    DROP TABLE IF EXISTS temp_new_customers;  
    CREATE TEMP TABLE temp_new_customers AS  
    SELECT  
        a.customer_hub_id,  
        a.customer_id,  
        load_date AS deposits_recency,  
        a.total_accounts AS deposits_frequency,  
        m.deposits_monetary_new AS deposits_monetary,  
        m.last_max_balance_new AS last_max_balance,  
        daterange(load_date, NULL) AS date_range  
    FROM temp_customer_agg a  
    JOIN temp_monetary m USING (customer_hub_id)  
    LEFT JOIN feature_store.rfm_deposits r ON r.customer_hub_id = a.customer_hub_id  
    WHERE a.is_active = TRUE  
      AND r.customer_hub_id IS NULL;  
  
    INSERT INTO feature_store.rfm_deposits (  
        customer_hub_id,  
        customer_id,  
        deposits_recency,  
        deposits_frequency,  
        deposits_monetary,  
        date_range,  
        hash,  
        last_max_balance  
    )  
    SELECT  
        n.customer_hub_id,  
        n.customer_id,  
        n.deposits_recency,  
        n.deposits_frequency,  
        n.deposits_monetary,  
        n.date_range,  
        md5(  
          n.customer_hub_id::text || '^?$%' ||  
          n.customer_id::text || '^?$%' ||  
          n.deposits_recency::text || '^?$%' ||  
          n.deposits_frequency::text || '^?$%' ||  
          n.deposits_monetary::text  
        )::uuid AS hash,  
        n.last_max_balance  
    FROM temp_new_customers n;  
  
    DROP TABLE IF EXISTS temp_prev_balance;  
    CREATE TEMP TABLE temp_prev_balance AS  
    SELECT  
        p.customer_hub_id,  
        SUM((kv.value)::numeric) AS prev_total_balance  
    FROM temp_prev_state p  
    LEFT JOIN LATERAL jsonb_each_text(COALESCE(p.last_max_balance, '{}'::jsonb)) kv ON true  
    GROUP BY p.customer_hub_id;  
  
    DROP TABLE IF EXISTS temp_reactivated;  
    CREATE TEMP TABLE temp_reactivated AS  
    SELECT  
        a.customer_hub_id,  
        a.customer_id,  
        load_date AS deposits_recency,  
        a.total_accounts AS deposits_frequency,  
        m.deposits_monetary_new AS deposits_monetary,  
        m.last_max_balance_new AS last_max_balance,  
        daterange(load_date, NULL) AS date_range  
    FROM temp_customer_agg a  
    JOIN temp_monetary m USING (customer_hub_id)  
    JOIN temp_prev_balance pb USING (customer_hub_id)  
    WHERE a.is_active = TRUE  
      AND COALESCE(pb.prev_total_balance, 0) = 0;  -- True Reactivation Only  
  
    UPDATE feature_store.rfm_deposits t  
    SET date_range = daterange(lower(t.date_range), load_date),  
        last_updated_dwh = now()  
    WHERE prev_load_date >= lower(t.date_range)  
      AND (upper(t.date_range) IS NULL OR prev_load_date < upper(t.date_range))  
      AND t.customer_hub_id IN (SELECT customer_hub_id FROM temp_reactivated);  
  
    SELECT string_agg(quote_ident(column_name), ', ')  
      INTO insert_cols  
    FROM information_schema.columns  
    WHERE table_schema = 'feature_store'  
      AND table_name = 'rfm_deposits'  
      AND column_name NOT IN ('created_dwh','last_updated_dwh');  
  
    SELECT string_agg(  
             'coalesce(' || quote_ident(column_name) || '::text, '''') || ''^?$%''',  
             ' || '  
           )  
      INTO hash_expr  
    FROM information_schema.columns  
    WHERE table_schema = 'feature_store'  
      AND table_name = 'rfm_deposits'  
      AND column_name NOT IN ('customer_hub_id','customer_id','date_range','hash','created_dwh','last_updated_dwh');  
  
    result_sql := format($$DROP TABLE IF EXISTS temp_inserted;  
        CREATE TEMP TABLE temp_inserted AS  
        SELECT  
            r.*,  
            md5(%s)::uuid AS hash  
        FROM temp_reactivated r;$$, hash_expr);  
    EXECUTE result_sql;  
  
    EXECUTE format($$INSERT INTO feature_store.rfm_deposits (%s)  
                    SELECT %s FROM temp_inserted;$$,  
                    insert_cols, insert_cols);  
  
    ANALYZE feature_store.rfm_deposits;  
  
    RAISE NOTICE 'RFM Deposits incremental load complete for %', load_date;  
END;  
$procedure$  
;  

Appreciate any tips