All Forums Database
01 Dec 2014
Help with avoiding redistribution using a hash index

Hi,

 

I am working on a query to combine multiple rows with overlapping dates for the same information. I use partition by and reset when to achieve this. However, even though the table has primary index, hash index with order by, and collected statistics on the partition by columns, the data gets redistributed twice across all amps and takes more than an hour to complete. The table has around 200 million records, and the first query sets the start date of all records having the same party_id, Source_Name_Txt, Source_Address_Txt within an unbroken interval(RESET WHEN used to ensure that) as the min(party_xref_start_dt). I would later use an outer query to group and combine all records with the same start date and associated values. The second query with TD_NORMALIZED_MEET achieves the same and combines the records in the same step and runs in less than 15 mins. However, I cannot use TD_NORMALIZE_MEET as I need to use additional attributes like minimum of ins_audit_id of the combined records which isn't supported by TD_NORMALIZED_MEET. Is there any way to avoid the redistribution in the below query and make the hash index to be used in the execution?

 

 

Thanks

Suma

 

Table cdw_sandbox.suma_xref_prep4 has around 185 million records. After collapsing unbroken intervals with the same data, it has 175 million records.

 

 

Query 1: Using ordered analytical functions- redistributed twice and takes more than an hour:

SELECT party_id, Source_Name_Txt, Source_Address_Txt,
COALESCE( MIN(party_xref_start_dt) OVER
(PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
ORDER BY party_xref_start_dt 
RESET WHEN party_xref_start_dt >MAX(party_xref_end_dt) OVER
                                                                (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt                                                                                                              ORDER BY party_xref_start_dt
                                                                ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
                                                                )
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
), party_xref_start_dt)   party_xref_start_dt, 
party_xref_end_dt
 
FROM cdw_sandbox.suma_xref_prep4
WHERE party_xref_start_dt <> party_xref_end_dt

Query 2: Same functionality with TD_NORMALIZE_MEET table function and LOCAL ORDER BY- runs in <15 mins:

WITH Prod(party_id, Source_Name_Txt, Source_Address_Txt , Period_Dt) AS
(
SELECT party_id, Source_Name_Txt, Source_Address_Txt
, PERIOD(Party_xref_Start_Dt  , Party_xref_End_Dt ) AS Period_Dt
FROM CDW_SANDBOX.suma_xref_prep4
WHERE Party_xref_Start_Dt < Party_xref_End_Dt
)
  
SELECT party_id, Source_Name_Txt, Source_Address_Txt,
Period_Dt
, Nrm_Count
  
FROM TABLE (TD_NORMALIZE_MEET
(NEW VARIANT_TYPE(prod.party_id, prod.Source_Name_Txt, prod.Source_Address_Txt)
,Prod.Period_Dt)
RETURNS (Party_Id DECIMAL(18,0)
,Source_Name_Txt VARCHAR(500)
, Source_Address_Txt VARCHAR(500)
,Period_Dt PERIOD(DATE)
,Nrm_Count INTEGER)
HASH BY party_id, Source_Name_Txt, Source_Address_Txt
LOCAL ORDER BY party_id, Source_Name_Txt, Source_Address_Txt, Period_Dt
) A
;
FROM cdw_sandbox.suma_xref_prep4
WHERE party_xref_start_dt <> party_xref_end_dt
) a
GROUP BY 1, 2, 3, 4
;

Table information:

SHOW TABLE cdw_sandbox.suma_xref_prep4;
CREATE SET TABLE cdw_sandbox.suma_xref_prep4 ,NO FALLBACK ,
     NO BEFORE JOURNAL,
     NO AFTER JOURNAL,
     CHECKSUM = DEFAULT,
     DEFAULT MERGEBLOCKRATIO
     (
      Party_Id DECIMAL(18,0),
      Party_Xref_Type_Cd BYTEINT,
      Party_Xref_Start_Dt DATE FORMAT 'YY/MM/DD',
      Source_Account_Nbr DECIMAL(9,0),
      Source_Sub_Account_Nbr DECIMAL(5,0),
      Source_Email_Address_Txt VARCHAR(500) CHARACTER SET LATIN NOTCASESPECIFIC,
      Source_Email_Key DECIMAL(10,0),
      Source_Name_Txt VARCHAR(500) CHARACTER SET LATIN NOT CASESPECIFIC,
      Source_Address_Txt VARCHAR(500) CHARACTER SET LATIN NOTCASESPECIFIC,
      Source_Mobile_Phone_Num DECIMAL(18,0),
      next_row_start_dt DATE FORMAT 'YY/MM/DD',
      party_xref_end_dt DATE FORMAT 'YY/MM/DD',
      INS_AUDIT_ID INTEGER,
      Updt_Audit_Id INTEGER)
PRIMARY INDEX ( Party_Id ,Source_Name_Txt ,Source_Address_Txt );

Hash Index:

CREATE HASH INDEX cdw_sandbox.hash_12

(party_id, Source_Name_Txt, Source_Address_Txt, Party_xref_Start_Dt, Party_xref_end_Dt) ON cdw_sandbox.suma_xref_prep4

BY (party_id, Source_Name_Txt, Source_Address_Txt) 

ORDER BY VALUES(Party_xref_Start_Dt)

;

Stats:

COLLECT STATISTICS ON cdw_sandbox.suma_xref_prep4 INDEX (party_id, Source_Name_Txt, Source_Address_Txt)           
;

Explain of query 1:

Explain SELECT party_id, Source_Name_Txt, Source_Address_Txt,
COALESCE( MIN(party_xref_start_dt) OVER
(PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
ORDER BY party_xref_start_dt 
RESET WHEN party_xref_start_dt >MAX(party_xref_end_dt) OVER
                                                                (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt                                                                                                              ORDER BY party_xref_start_dt
                                                                ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
                                                                )
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
), party_xref_start_dt)   party_xref_start_dt, 
party_xref_end_dt
 
FROM cdw_sandbox.suma_xref_prep4
WHERE party_xref_start_dt <> party_xref_end_dt

  1) First, we lock a distinct cdw_sandbox."pseudo table" for read on a
     RowHash to prevent global deadlock for cdw_sandbox.suma_xref_prep4.
  2) Next, we lock cdw_sandbox.suma_xref_prep4 for read.
  3) We do an all-AMPs RETRIEVE step from cdw_sandbox.suma_xref_prep4
     by way of an all-rows scan with a condition of (
     "cdw_sandbox.suma_xref_prep4.Party_Xref_Start_Dt <>
     cdw_sandbox.suma_xref_prep4.party_xref_end_dt") into Spool 3
     (all_amps), which is built locally on the AMPs.  The input table
     will not be cached in memory, but it is eligible for synchronized
     scanning.  The result spool file will not be cached in memory.
     The size of Spool 3 is estimated with no confidence to be
     167,160,741 rows (61,347,991,947 bytes).  The estimated time for
     this step is 35.54 seconds.
  4) We do an all-AMPs STAT FUNCTION step from Spool 3 (Last Use) by
     way of an all-rows scan into Spool 6 (Last Use), which is built
     locally on the AMPs.  The result rows are put into Spool 4
     (all_amps), which is built locally on the AMPs.  The size is
     estimated with no confidence to be 167,160,741 rows (
     232,854,912,213 bytes).
  5) We do an all-AMPs RETRIEVE step from Spool 4 (Last Use) by way of
     an all-rows scan into Spool 2 (all_amps), which is built locally
     on the AMPs.  The result spool file will not be cached in memory.
     The size of Spool 2 is estimated with no confidence to be
     167,160,741 rows (63,353,920,839 bytes).  The estimated time for
     this step is 26.76 seconds.
  6) We do an all-AMPs STAT FUNCTION step from Spool 2 (Last Use) by
     way of an all-rows scan into Spool 12 (Last Use), which is
     redistributed by hash code to all AMPs.  The result rows are put
     into Spool 10 (all_amps), which is built locally on the AMPs.  The
     size is estimated with no confidence to be 167,160,741 rows (
     232,854,912,213 bytes).
  7) We do an all-AMPs RETRIEVE step from Spool 10 (Last Use) by way of
     an all-rows scan into Spool 1 (all_amps), which is built locally
     on the AMPs.  The result spool file will not be cached in memory.
     The size of Spool 1 is estimated with no confidence to be
     167,160,741 rows (63,688,242,321 bytes).  The estimated time for
     this step is 26.76 seconds.
  8) We do an all-AMPs STAT FUNCTION step from Spool 1 (Last Use) by
     way of an all-rows scan into Spool 19 (Last Use), which is
     redistributed by hash code to all AMPs.  The result rows are put
     into Spool 15 (group_amps), which is built locally on the AMPs.
     The size is estimated with no confidence to be 167,160,741 rows (
     232,854,912,213 bytes).
  9) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 15 are sent back to the user as the result
     of statement 1.

 

 

dnoeth 4628 posts Joined 11/04
01 Dec 2014

You can get a similar result to the TD_NORMALIZE_MEET using only 2 instead of 3 STAT steps. Is the min(ins_audit_id) the value from the starting row?
A Join Index might be better than a Hash Index. 
But the main reason why it's slow are probably the VARCHAR(500) used in ORDER BY (they will be expanded to CHAR for sorting). If you know the actual length you might cast them to shorter VARCHARs.
 

Dieter

01 Dec 2014

Thank you, I changed the type of Source_Name_Txt to VARCHAR(30) and Source_Address_Txt to VARCHAR(95), and created join index. The query completed in 1 hour 25 minutes. Explain plan looks the same. 
The min(ins_audit_id) is indeed from the starting row. Can you please elaborate on how to get it done with only 2 instead of 3 STAT steps?
If someone can explain this- why does the data get redistributed to all AMPs twice when the table primary index is the same as the partition by columns? Is there any way to avoid this? 60-80% of the query execution time is spent in step 6 of the explain plan. 
 

dnoeth 4628 posts Joined 11/04
02 Dec 2014

What's your TD release? I've seen Explains with multiple nested OLAP functions resulting in STAT steps  with local spools...
I did some search & replace on an existing query, this should be correct:

SELECT party_id, Source_Name_Txt, Source_Address_Txt, ins_audit_id,
  party_xref_start_dt, 
  COALESCE(MIN(prev_end_time) -- get the end time of the next row (or the max end time)
           OVER (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
                 ORDER BY party_xref_start_dt
                 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING)
           ,max_end_time) AS End_time
FROM
 (
  SELECT party_id, Source_Name_Txt, Source_Address_Txt, ins_audit_id,
    party_xref_start_dt,
    MAX(party_xref_end_dt) -- previous end date
    OVER (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt
          ORDER BY party_xref_start_dt
          ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) AS prev_end_time,
    MAX(party_xref_end_dt) -- max end date (needed in the next step)
    OVER (PARTITION BY party_id, Source_Name_Txt, Source_Address_Txt) AS max_end_time
  FROM suma_xref_prep4
  -- find the first row after a gap or the first row within this partition
  QUALIFY party_xref_start_dt > prev_end_time OR prev_end_time IS NULL
 ) AS dt

 

Dieter

08 Dec 2014

Thank you Dieter. I modified the query as you suggester(clever!), changed the varchar columns to varchar(95) for address and varchar(25) for name, created a join index and collected stats on the index columns. Now the query runs in one-third the original CPU-time it used to take, uses two STAT steps and does not redistribute the data. The Teradata version is 14.10.

Current explain plan:

  1) First, we lock a distinct cdw_sandbox."pseudo table" for read on a
     RowHash to prevent global deadlock for
     cdw_sandbox.suma_xref_prep_1.
  2) Next, we lock cdw_sandbox.suma_xref_prep_1 for read.
  3) We do an all-AMPs STAT FUNCTION step from
     cdw_sandbox.suma_xref_prep_1 by way of an all-rows scan with no
     residual conditions into Spool 5 (Last Use), which is assumed to
     be redistributed by value to all AMPs.  The result rows are put
     into Spool 3 (all_amps), which is built locally on the AMPs.  The
     size is estimated with high confidence to be 731,330,867 rows (
     293,263,677,667 bytes).
  4) We do an all-AMPs RETRIEVE step from Spool 3 (Last Use) by way of
     an all-rows scan with a condition of ("(Party_Xref_Start_Dt >
     Field_14) OR (Field_14 IS NULL)") into Spool 1 (used to
     materialize view, derived table, table function or table operator
     dt) (all_amps), which is built locally on the AMPs.  The result
     spool file will not be cached in memory.  The size of Spool 1 is
     estimated with high confidence to be 731,330,867 rows (
     107,505,637,449 bytes).  The estimated time for this step is 49.20
     seconds.
  5) We do an all-AMPs STAT FUNCTION step from Spool 1 (Last Use) by
     way of an all-rows scan into Spool 12 (Last Use), which is assumed
     to be redistributed by value to all AMPs.  The result rows are put
     into Spool 8 (group_amps), which is built locally on the AMPs.
     The size is estimated with high confidence to be 731,330,867 rows
     (196,728,003,223 bytes).
  6) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 8 are sent back to the user as the result of
     statement 1.

Appreciate your help on this. 

I looked up the difference between hash and join indexes in the below url and understand a hash index is only a subset of join index capabilities:

http://www.info.teradata.com/htmlpubs/DB_TTU_14_00/index.html#page/SQL_Reference/B035_1184_111A/Create_Hash_Index-Details.010.006.html

However I don't understand why a join index avoids redistribution but a hash index doesn't in this context. I would appreciate it if anyone can explain the reason for this.

 

Thank you

Suma

You must sign in to leave a comment.