11import asyncio
22import logging
3+ from typing import Any
34
45import aiohttp
56import async_timeout
@@ -72,10 +73,61 @@ async def update_old_tokens(
7273 sentry_sdk .capture_exception (e )
7374 return False
7475
76+ async def update_old_user_info (
77+ self , user : User , user_data : dict [str , Any ]
78+ ) -> bool :
79+ """사용자 프로필 정보 업데이트"""
80+
81+ field_updates = {}
82+
83+ # 각 필드별 업데이트 체크 및 적용
84+ if (new_email := user_data .get ("email" )) and (
85+ not user .email or user .email != new_email
86+ ):
87+ field_updates ["email" ] = new_email
88+
89+ if (new_username := user_data .get ("username" )) and (
90+ not user .username or user .username != new_username
91+ ):
92+ field_updates ["username" ] = new_username
93+
94+ if (profile := user_data .get ("profile" )) and (
95+ new_thumbnail := profile .get ("thumbnail" )
96+ ):
97+ if not user .thumbnail or user .thumbnail != new_thumbnail :
98+ field_updates ["thumbnail" ] = new_thumbnail
99+
100+ # 업데이트할 필드가 없으면 조기 반환
101+ if not field_updates :
102+ return True
103+
104+ try :
105+ # 필드 일괄 업데이트
106+ for field , value in field_updates .items ():
107+ setattr (user , field , value )
108+
109+ await user .asave (update_fields = list (field_updates .keys ()))
110+
111+ logger .info (
112+ "Updated user profile fields %s for %s" ,
113+ list (field_updates .keys ()),
114+ user .velog_uuid ,
115+ )
116+ return True
117+
118+ except Exception as e :
119+ logger .error (
120+ "Failed to update user info: %s (user velog uuid: %s)" ,
121+ e ,
122+ user .velog_uuid ,
123+ )
124+ sentry_sdk .capture_exception (e )
125+ return False
126+
75127 async def bulk_upsert_posts (
76128 self ,
77129 user : User ,
78- fetched_posts : list [dict [str , str ]],
130+ fetched_posts : list [dict [str , Any ]],
79131 batch_size : int = 200 ,
80132 ) -> bool :
81133 """Post 객체를 일정 크기의 배치로 나눠서 삽입 또는 업데이트"""
@@ -95,7 +147,7 @@ async def bulk_upsert_posts(
95147 return False
96148
97149 async def _upsert_batch (
98- self , user : User , batch_posts : list [dict [str , str ]]
150+ self , user : User , batch_posts : list [dict [str , Any ]]
99151 ) -> None :
100152 """단일 배치 처리, bulk_upsert_posts 에서 호출됨"""
101153
@@ -209,7 +261,7 @@ def _execute_sync() -> None:
209261 await _execute_sync ()
210262
211263 async def update_daily_statistics (
212- self , post : dict [str , str ], stats : dict [str , str ]
264+ self , post : dict [str , Any ], stats : dict [str , Any ]
213265 ) -> None :
214266 """PostDailyStatistics를 업데이트 또는 생성 (upsert)"""
215267 if not stats or not isinstance (stats , dict ):
@@ -222,17 +274,17 @@ async def update_daily_statistics(
222274 today = get_local_now ().date ()
223275 post_id = post ["id" ]
224276
225- stats_data = stats .get ("data" , {}) # type: ignore
277+ stats_data = stats .get ("data" , {})
226278 if not stats_data or not isinstance (
227- stats_data .get ("getStats" ), # type: ignore
279+ stats_data .get ("getStats" ),
228280 dict ,
229281 ):
230282 logger .warning (
231283 f"Skip updating statistics due to missing getStats data for post { post_id } "
232284 )
233285 return
234286
235- view_count = stats_data ["getStats" ].get ("total" , 0 ) # type: ignore
287+ view_count = stats_data ["getStats" ].get ("total" , 0 )
236288 like_count = post .get ("likes" , 0 )
237289
238290 # 트랜잭션 내에서 실행
@@ -331,29 +383,50 @@ async def process_user(
331383 origin_access_token = aes_encryption .decrypt (user .access_token )
332384 origin_refresh_token = aes_encryption .decrypt (user .refresh_token )
333385
386+ # ========================================================== #
387+ # STEP1: 토큰이 유효성 체크 및 업데이트. 이후 사용자 정보 업데이트
388+ # ========================================================== #
389+
334390 # 토큰 유효성 검증
335391 new_user_cookies , user_data = await fetch_velog_user_chk (
336392 session ,
337393 origin_access_token ,
338394 origin_refresh_token ,
339395 )
340- if not (user_data or new_user_cookies ):
341- return
342396
343- if user_data ["data" ]["currentUser" ] is None : # type: ignore
397+ if (
398+ not (user_data or new_user_cookies )
399+ or user_data .get ("data" , {}).get ("currentUser" ) is None
400+ ):
344401 logger .warning (
345402 f"Failed to fetch user data because of wrong tokens. (user velog uuid: { user .velog_uuid } )"
346403 )
347404 return
348405
349406 if new_user_cookies :
350- await self .update_old_tokens (
407+ user_token_result = await self .update_old_tokens (
351408 user ,
352409 aes_encryption ,
353410 new_user_cookies ,
354411 )
412+ if not user_token_result :
413+ raise Exception ("Failed to update tokens, Check the logs" )
414+ origin_access_token = new_user_cookies ["access_token" ]
415+ origin_refresh_token = new_user_cookies ["refresh_token" ]
416+
417+ # velog 응답과 기존 저장된 사용자 정보 비교 및 업데이트
418+ # user_data -> currentUser 에는 id / username / email / profile { thumbnail } 존재
419+ user_info_result = await self .update_old_user_info (
420+ user ,
421+ user_data ["data" ]["currentUser" ],
422+ )
423+ if not user_info_result :
424+ raise Exception ("Failed to update user_info, Check the logs" )
355425
356- username = user_data ["data" ]["currentUser" ]["username" ] # type: ignore
426+ # ========================================================== #
427+ # STEP2: 게시물 전체 목록을 가져와서 upsert 와 상태 동기화 (비활성, 활성)
428+ # ========================================================== #
429+ username = user_data ["data" ]["currentUser" ]["username" ]
357430 fetched_posts = await fetch_all_velog_posts (
358431 session , username , origin_access_token , origin_refresh_token
359432 )
@@ -370,6 +443,9 @@ async def process_user(
370443 user , all_post_ids , min_posts_threshold = 1
371444 )
372445
446+ # ========================================================== #
447+ # STEP3: 게시물 전체 목록을 기반으로 세부 통계 가져와서 upsert
448+ # ========================================================== #
373449 # 게시물을 적절한 크기의 청크로 나누어 처리
374450 chunk_size = 20
375451 for i in range (0 , len (fetched_posts ), chunk_size ):
0 commit comments