@@ -85,31 +85,54 @@ private void messageSender(final TaskType taskType) {
8585 @ Scheduled (fixedRate = FIXED_RATE_FOR_SCHEDULAR_FETCH_TWEETS )
8686 public void fetchTweetsFromTwitterSearch () {
8787 TaskType taskType = TaskType .FETCH_TWEETS_FROM_TWITTER_SEARCH ;
88- sendAndReceive (taskType );
88+ sendAndReceiveTweet (taskType );
8989 }
9090
9191 @ Override
9292 @ Scheduled (fixedRate = FIXED_RATE_FOR_SCHEDULAR_UPDATE_TWEETS )
9393 public void updateTweets () {
9494 TaskType taskType = TaskType .UPDATE_TWEETS ;
95- sendAndReceive (taskType );
95+ sendAndReceiveTweet (taskType );
9696 }
9797
9898 @ Override
9999 @ Scheduled (fixedRate = FIXED_RATE_FOR_SCHEDULAR_UPDATE_USER )
100100 public void updateUserProfiles () {
101101 TaskType taskType = TaskType .UPDATE_USER_PROFILES ;
102- sendAndReceive (taskType );
102+ sendAndReceiveUser (taskType );
103103 }
104104
105105 @ Override
106106 @ Scheduled (fixedRate = FIXED_RATE_FOR_SCHEDULAR_UPDATE_USER_BY_MENTION )
107107 public void updateUserProfilesFromMentions () {
108108 TaskType taskType = TaskType .UPDATE_USER_PROFILES_FROM_MENTIONS ;
109- sendAndReceive (taskType );
109+ sendAndReceiveUser (taskType );
110110 }
111111
112- private void sendAndReceive (TaskType taskType ){
112+ private void sendAndReceiveTweet (TaskType taskType ){
113+ CountedEntities countedEntities = countedEntitiesService .countAll ();
114+ Task task = taskService .create ("Start via MQ" , taskType ,countedEntities );
115+ TaskMessage taskMessage = new TaskMessage (task .getId (), taskType , task .getTimeStarted ());
116+ Message <TaskMessage > mqMessage = MessageBuilder .withPayload (taskMessage )
117+ .setHeader ("task_id" , task .getId ())
118+ .setHeader ("task_uid" , task .getUniqueId ())
119+ .setHeader ("task_type" , task .getTaskType ())
120+ .build ();
121+ MessagingTemplate mqTemplate = new MessagingTemplate ();
122+ Message <?> returnedMessage = mqTemplate .sendAndReceive (startTaskChannel , mqMessage );
123+ Object o = returnedMessage .getPayload ();
124+ countedEntities = countedEntitiesService .countAll ();
125+ if ( o instanceof TweetResultList ){
126+ TweetResultList msg = (TweetResultList ) o ;
127+ long taskId = msg .getTaskId ();
128+ task = taskService .findById (taskId );
129+ taskService .done (task ,countedEntities );
130+ } else {
131+ taskService .error (task ,"Wrong type of returnedMessage" ,countedEntities );
132+ }
133+ }
134+
135+ private void sendAndReceiveUser (TaskType taskType ){
113136 CountedEntities countedEntities = countedEntitiesService .countAll ();
114137 Task task = taskService .create ("Start via MQ" , taskType ,countedEntities );
115138 TaskMessage taskMessage = new TaskMessage (task .getId (), taskType , task .getTimeStarted ());
@@ -136,7 +159,7 @@ private void sendAndReceive(TaskType taskType){
136159 @ Scheduled (fixedRate = FIXED_RATE_FOR_SCHEDULAR_FETCH_USER_LIST )
137160 public void fetchUsersFromDefinedUserList () {
138161 TaskType taskType = TaskType .FETCH_USERS_FROM_DEFINED_USER_LIST ;
139- sendAndReceive (taskType );
162+ sendAndReceiveUser (taskType );
140163 }
141164
142165 @ Override
0 commit comments