From a060fa69c536dd38cb3a854c0c0e750852f7f46f Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Sat, 3 May 2025 15:40:50 +0100 Subject: [PATCH] Refactor data loading and streaming pipeline endpoints for improved file handling - Updated `/load-data` endpoint to make the file parameter optional and added validation for CSV uploads. - Introduced a new dependency function `get_file_if_csv` to streamline file checks when loading data from CSV. - Enhanced `/run/streaming-pipeline` endpoint to utilize the new file handling logic. - Improved code readability by restructuring file renaming logic. --- .../__pycache__/api.cpython-311.pyc | Bin 25064 -> 25512 bytes salary_analytics/api.py | 35 +++++++++++------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/salary_analytics/__pycache__/api.cpython-311.pyc b/salary_analytics/__pycache__/api.cpython-311.pyc index 374ad32cc2419cd20d985fa180ab38af80261fac..af607134f8518e20255f63ad5788722ea8f79abb 100644 GIT binary patch delta 4669 zcmb7HYj9h|6~3!0>B_P!$=1UU`EhN@u@xtFj2%0696OHlw&Oeud58od>t5SRt{$vp zf;~$L^k8efl;z`*TwFy@CR>08d+;d+eJV1mSncEIfJh#Fs~&tn*Gs zrGP{Pg(#wCisl9KIIY*r(fmL@rwy7VS`aAUv{AE0Z2?=<9m$y3zPx9-K^wmo@jZX92m1=(JG>qfy!uA zpo;SanjEbTRCC&@)kM7kFQ;vqFIpR@C4x>+o%0$eNSg1{}$aVLRm z1Fjw0M4#HkGCHqeMNU!&k}}b2=*UU@APIn^Gbd>P$@8M$ureoU1j$v<=>kdCYK6Xt ztYXXbE<<-ts|B=%^;MLBB-numu++^&>0>$cT|xiY`I60bwZuAE61UW)3W9-fQH-=JZ=; z3-R8ll_?C5so?`bC6o#UwRlKT723__^1Dc5dZ|U!lLppouXEvmnSx|0L{sC5pnAUw zhWpb|dxj7n+gMa5_oA^i2se$dr}21-Ze>ptwUS15zUYVE!CPoq#?+9O8Vhn~=ny;U zXaNU);dn=9#01b{srp4LpnMu?33Nd`9#@eP2g;);9y4 z)B(sc<>A;uv5+<~sRj?I69?l|q5GgwW0ySj>CxhZNOrQ>vUcMx5YpYu} z*yr&z+>IKybA;}N)|?0IoTrL}*!!M8lY7!XEkCLQd#kJ37VJ$`E&ea|-mdcgU-k;+ zN}{l3GTM`??IPLDnte6@#Yn>E>*De7$ZnBczerUGDK{<$Ezz`-K>qC$Q zMNY+Kj8ewpLyxn@l?}%2*vrR)D@V3<0h!6;Y2XSEB5xdzK}ZKEUyrL$`57)w#Wl!( zHs8E58=IW}iK8{`Z8kK;;oWsNG0eu0RUP#~5by(*MVHPXjnT)G0`1Og@z@`Yyox_Re;&Xo?JGqzDR736CvvUVhh6Yw&K;bJ2s z@j&<#{*DM&rGgW#Z%iMVelB%n`Zre3S@BY9=#q8#vUPaII(*4AJezMj($9|!_DOds ziLk$QFVfBAv)?XnWyg9d2w^YvY+z>#Z0xI^Q8>}6x0L7TsV|Br8{lKTN2y0-l{(;D0a52x6fc# z;Ez4f+b5E9h3sd&rKE`cWW8T>OiApEQfInkO(Bt-Q_@L6aj|86E^umnpI0n6=Vn@8 zyT3SvYhwPHvfOx>PuY|b?D?YZTq%<`RkF|eR!U`jEDt-pX=lCiAG<5%$~9$qehlj# z@cfYdu>A!2_S4|`S8HDuE0jw1opnQos?-4Nn#)Vdn0@{BTx9i2rW{jb5`0vOCe4BX zH01D*6Ef$Ta!ol7aPLnL`pJ}}@Zgt2@DQ&&?$sbU1IUE(2_6V4d$Vh%CD|oGYYPg((tfg?A$-|*kcudY1huP~}`&@Tmw-$s}gyjf6 z=Gf*XdszFnM+RIV$e8(XK|Y=h1!63IFk^zMHj;|KS%UR1d)Vmr$0MkJPWICF`g~q^U%`M<-Y#ulTP~irTYx@au3sxl=VcLTFG1SJ>H1zz zf3xt8zR$GDs(a6(1Li%exxd)(UP*6jf05yW#R#GcMFtREC>D|SxM7eB&F%q7d|&D% zzhI zMK}@ZR%{(YR-cH&&e?~22th%JBP0><`a>T=co>17{dj`XHUwN*^FL%Nk!%9^yQvFq zljEA&O+N<~)}4^-F|fjYuBKh&lCblC|4(n-_Ybjlwr0_rcfNV&yS}#q=L2)X64J%t zxjz0mJNDY-Oye3B49T7A$R_gU+IO9AZ9czwPS6uOhtppEIlKF{{WD8@ugR~YEbMTo zGHXWgWF`Oh2bK95zt0CTOBG6m-tIKV^T}QW&W~oxxbwAIci1N z>34EESFH?J`Kvgcr@F$`{%RtKf?_%0&V&VY4J#%UU6vEItFpQUA_yKrn9kHu}ZrMxE9Wh0@u3AJquhLaGOylRV$6` zccM$XWmQrOl5)wV-MT9AfaEAhwyjF)LGp~`(YCKj8bR_QjOhRgpS8hzbE4#CrE^S_ zH_d6*<)CZTM+^8E)3~&^Wh5wMfMZ&5nst%`O`BX672uJg1#W4F6qF)K|)t9f2@ zFNlzLc+x+4d&ZC4sdSI&XG%sf~T7;YFmhweW2|~R_i_y&DG$5W+Rvo+^iyN zNLPhq6{A@Pbq#59DeHQ-3rL?wPis(w^+W3rJOC*v6iLu}WH%seMBt{4NO?K14~<9C z{=CWlY!k^|Gm4Fv9Hfg)nmTjPETxYpf;2H1V}UYz@|x-Q64}6BG1uDAT`Ffq8Bp#~ zVA|W0e>0~Daj|y`YMtFURS&`%?`Wi<(P)DDnAOrs8rW{jORk|?j4~Zpf@F>`aIdR3q{=se8vuT)KSsLleZIv{RliC8lb)_ zQl9MHK*w?CoKDKa$qPn;>d9#(Frl2Bic*;#NxtTIN+Ji?NXZu6L3DD6JyB9$b{CSL zJNF+-s_XqY;`0_jzXICTnOSpb1v$p{mHw5CBzwxviuwbfp7!o%W@jP$m))VKD^nBa zhA)_U!0GzGrhe2}PDa_wPMrEXm1caR{p#(BxB1zTVAwlt6uB%wP0OMyj_rtpC_Rq_$hooo2V{~O( zO$izdMFQcdtf+BHlf9l_YINhEr(>+Rao9eAYt2TpN zX_Puc5oatEQ`AsIX?OZOx5(npS{)s$i(XT!d}Shh_>-ffN_=JW)_f-P94_%(($e;^ zMx4IENc)5A@%C8u1rVpC(fB=d3$t{1c<$(fthZyR`98GZk>McNA}+v{Ph0wf5WMV6 zM4cF%Qx3v;bDq7{QLo#JRxb8w$JpKuAX8a9dOqL`^18_ggf~HX8Fu2}i(H(Ds*v_{ zn!8cF`uW3|$!ETwJ>OYZc^-AVH1$AXs^P~^0{SGXo=h%xo)U|v9ZhsH(gYrrU^o;R zZ^-2JWA;pUji@uRvCU)bMt2zzSz*sEqG!i?CW(Q)=PMNDEVkU!R&*shK$_Dn69WE(% z(ZRg8Zx-c3Hn^v>U?xv4LYt?>#o`4mTyOWOW;7EHmOX z?1pTaHVOjJP>Hiv$eeA)He}Ft$bCL?AU&*ewU#1MVUBFda*F`hH5tMRwzk&7_ev@7<;AgF`bu#ZK+*2z+2IdTj8K z!S{=+-!HCxzqk%?IZM02zAOOD4}L0WZH|8+T-dp65^Tjd_+j5g--VqYSPCx;@|L5Y z&g0|{1Nb~7l{I>NGBOblO)J#QD)t?*HKTSDLJLAGf{V@XbCJ8)OZ)CSZ3RKf$gKk0 zEEfe*Bs!Hcz}GmI2*F``2CevMt2q%3MN)b>MENO;hjPjg4o)Z_iPJ}M++zrjBYYF# z34|X4qzv$F%K<#Qx>3fPh|zo``MGi!sQ|*=2r58Imx!W4N*9d9l!%;et!{NAK82(8 zqtQr28BM^Q5E+LvndYG*3#%Be%T6Cj$bXyNIlQf!=j9dTpFwyQ;S$2j2zV6IF#Ex9 zA=%B|8g`LQ?9anZu!E)hOYOe~r3-iQ1RQA}!QUBSx%e>i?XO}rU6m}h|Ewc7t7utR z1*c#tm_Km;_WRf!UC$-If8ZnS>BfT|;q^^hcC<-vcuYXQ(Wcq4P5;I=3F%IPbQjV; zCp{&DM&Zq}KEv)>@ol#V%-glbJznkG%{{Gq>a|zgIuKo{*MjJZS3#=H1HezKzn76K7i5sPTDE>zBOfeyN4{y!xfq-aKI5)AVKUHw)MD(Bzugc(72q zR@~Ei&>~&W)q?1{MGK1xJ4lOVqk>bVUgT{ov>9>IM^rkyz z{a6s%*jR{SgF-dz)R6*PDt|@TNR@aj8i^}#i?iE3hD)ft3d9H3XDV{A9!boYt(6LY~ zz?2RuXDl>Md6Qwm`REk{E89C_FSp?kdpxKHX{L7xtaP!I+S%Mlm97Ry!+$qMica&| z;T2tlLN5Zg&M8AQlIh`*mBtn5&{2CQvYJ>FzPBUDA43>Lh#*h|Y#8Vq0`Cm?-PwjL zR^!_4{}Avu44u%3O{&Um^iRM-tqSq0(CREpgpg%XB${Ob!5|PHxhf2OnSaTJqyLsH z%i>1Txhw!Y-p5~;Pq4S{c7{al`NFRiUNyR2-0^DJwbor{3g06G*T}%KsEBBQc-fME bzuKODU-gfqiM9BtB#67oI?MC`e%}2Dd@cpx diff --git a/salary_analytics/api.py b/salary_analytics/api.py index 8172a9b..ed4e7e5 100644 --- a/salary_analytics/api.py +++ b/salary_analytics/api.py @@ -2,7 +2,7 @@ FastAPI application for salary analytics. """ -from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File +from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel @@ -242,13 +242,13 @@ async def run_full_pipeline(): raise HTTPException(status_code=500, detail=str(e)) @app.post("/load-data") -async def load_data(source: str = "db", file: UploadFile = None): +async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): """ Load data from either database or CSV file. Args: source (str): Source of data ('db' or 'csv') - file (UploadFile): CSV file to load (required if source is 'csv') + file (UploadFile, optional): CSV file to load (required if source is 'csv') Returns: dict: Status of data loading @@ -288,15 +288,25 @@ async def load_data(source: str = "db", file: UploadFile = None): logger.error(f"Error loading data: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) +async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): + """Dependency to handle file upload only when source is csv.""" + if source == 'csv' and not file: + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + return file + @app.post("/run/streaming-pipeline", response_model=List[BatchResponse]) -async def run_streaming_pipeline(source: str = "db", file: UploadFile = None, batch_size: int = 10000): +async def run_streaming_pipeline( + source: str = "db", + batch_size: int = 10000, + file: Optional[UploadFile] = Depends(get_file_if_csv) +): """ Run the complete salary analytics pipeline in batches. Args: source (str): Source of data ('db' or 'csv') - file (UploadFile): CSV file to load (required if source is 'csv') batch_size (int): Number of rows to process in each batch + file (UploadFile, optional): CSV file to load (required if source is 'csv') Returns: List[BatchResponse]: List of responses for each batch processed @@ -305,9 +315,6 @@ async def run_streaming_pipeline(source: str = "db", file: UploadFile = None, ba if source not in ['db', 'csv']: raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") - if source == 'csv' and not file: - raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - # Initialize data loader data_loader = DataLoader() data_loader.chunk_size = batch_size @@ -326,12 +333,14 @@ async def run_streaming_pipeline(source: str = "db", file: UploadFile = None, ba chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) + # Rename columns chunk = chunk.rename(columns={ - 'd1': 'trx_type', - 'd2': 'trx_subtype', - 'd3': 'initiated_by', - 'd4': 'customer_id' - }) + 'd1': 'trx_type', + 'd2': 'trx_subtype', + 'd3': 'initiated_by', + 'd4': 'customer_id' + }) + chunk = chunk.dropna() return chunk