Skip to content

Commit e3f96b4

Browse files
working on #138
1 parent 0d725d7 commit e3f96b4

File tree

7 files changed

+171
-9
lines changed

7 files changed

+171
-9
lines changed

src/main/java/org/woehlke/twitterwall/frontend/controller/TestController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ public TestController(UserService userService, FetchUsersFromDefinedUserList fet
9090
@Async
9191
protected void startOnListRenew(){
9292
String msg = "startOnListRenew: ";
93-
log.info(msg+"START scheduledTasksFacade.fetchUsersFromDefinedUserList: ");
94-
fetchUsersFromDefinedUserList.fetchUsersFromDefinedUserList();
95-
log.info(msg+"DONE scheduledTasksFacade.fetchUsersFromDefinedUserList: ");
93+
log.info(msg+"START startTask.fetchUsersFromDefinedUserList: ");
94+
startTask.fetchUsersFromDefinedUserList();
95+
log.info(msg+"DONE startTask.fetchUsersFromDefinedUserList: ");
9696
}
9797
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint;
2+
3+
import org.springframework.messaging.Message;
4+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
5+
import org.woehlke.twitterwall.scheduled.mq.msg.TweetFromTwitter;
6+
7+
import java.util.List;
8+
9+
public interface FetchTweetsFromTwitterSearch {
10+
11+
List<TweetFromTwitter> splitMessage(Message<TaskMessage> message);
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint;
2+
3+
import org.springframework.messaging.Message;
4+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
5+
import org.woehlke.twitterwall.scheduled.mq.msg.TwitterProfileMessage;
6+
7+
import java.util.List;
8+
9+
public interface FetchUsersFromDefinedUserList {
10+
11+
List<TwitterProfileMessage> splitMessage(Message<TaskMessage> message);
12+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint.impl;
2+
3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.messaging.Message;
5+
import org.springframework.social.twitter.api.Tweet;
6+
import org.springframework.stereotype.Component;
7+
import org.woehlke.twitterwall.oodm.entities.Task;
8+
import org.woehlke.twitterwall.oodm.service.TaskService;
9+
import org.woehlke.twitterwall.scheduled.mq.endoint.FetchTweetsFromTwitterSearch;
10+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
11+
import org.woehlke.twitterwall.scheduled.mq.msg.TweetFromTwitter;
12+
import org.woehlke.twitterwall.scheduled.service.backend.TwitterApiService;
13+
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
17+
@Component("mqFetchTweetsFromTwitterSearch")
18+
public class FetchTweetsFromTwitterSearchImpl implements FetchTweetsFromTwitterSearch {
19+
20+
private final TwitterApiService twitterApiService;
21+
22+
private final TaskService taskService;
23+
24+
@Autowired
25+
public FetchTweetsFromTwitterSearchImpl(TwitterApiService twitterApiService, TaskService taskService) {
26+
this.twitterApiService = twitterApiService;
27+
this.taskService = taskService;
28+
}
29+
30+
@Override
31+
public List<TweetFromTwitter> splitMessage(Message<TaskMessage> message) {
32+
List<TweetFromTwitter> tweets = new ArrayList<>();
33+
TaskMessage msgIn = message.getPayload();
34+
long id = msgIn.getTaskId();
35+
Task task = taskService.findById(id);
36+
task = taskService.start(task);
37+
List<Tweet> twitterTweets = twitterApiService.findTweetsForSearchQuery();
38+
for (Tweet tweet: twitterTweets) {
39+
TweetFromTwitter tweetMsg = new TweetFromTwitter(task.getId(),tweet);
40+
tweets.add(tweetMsg);
41+
}
42+
return tweets;
43+
}
44+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint.impl;
2+
3+
import org.springframework.messaging.Message;
4+
import org.springframework.social.twitter.api.TwitterProfile;
5+
import org.springframework.stereotype.Component;
6+
import org.woehlke.twitterwall.conf.TwitterwallFrontendProperties;
7+
import org.woehlke.twitterwall.conf.TwitterwallSchedulerProperties;
8+
import org.woehlke.twitterwall.oodm.entities.Task;
9+
import org.woehlke.twitterwall.oodm.service.TaskService;
10+
import org.woehlke.twitterwall.scheduled.mq.endoint.FetchUsersFromDefinedUserList;
11+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
12+
import org.woehlke.twitterwall.scheduled.mq.msg.TwitterProfileMessage;
13+
import org.woehlke.twitterwall.scheduled.service.backend.TwitterApiService;
14+
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
18+
@Component("mqFetchUsersFromDefinedUserList")
19+
public class FetchUsersFromDefinedUserListImpl implements FetchUsersFromDefinedUserList {
20+
21+
private final TwitterwallSchedulerProperties twitterwallSchedulerProperties;
22+
23+
private final TwitterwallFrontendProperties twitterwallFrontendProperties;
24+
25+
private final TwitterApiService twitterApiService;
26+
27+
private final TaskService taskService;
28+
29+
public FetchUsersFromDefinedUserListImpl(TwitterwallSchedulerProperties twitterwallSchedulerProperties, TwitterwallFrontendProperties twitterwallFrontendProperties, TwitterApiService twitterApiService, TaskService taskService) {
30+
this.twitterwallSchedulerProperties = twitterwallSchedulerProperties;
31+
this.twitterwallFrontendProperties = twitterwallFrontendProperties;
32+
this.twitterApiService = twitterApiService;
33+
this.taskService = taskService;
34+
}
35+
36+
@Override
37+
public List<TwitterProfileMessage> splitMessage(Message<TaskMessage> message) {
38+
List<TwitterProfileMessage> userProfileList = new ArrayList<>();
39+
TaskMessage msgIn = message.getPayload();
40+
long id = msgIn.getTaskId();
41+
Task task = taskService.findById(id);
42+
task = taskService.start(task);
43+
String imprintScreenName = twitterwallFrontendProperties.getImprintScreenName();
44+
String fetchUsersList = twitterwallSchedulerProperties.getFetchUserList().getName();
45+
List<TwitterProfile> foundTwitterProfiles = twitterApiService.findUsersFromDefinedList(imprintScreenName,fetchUsersList);
46+
for (TwitterProfile twitterProfile : foundTwitterProfiles) {
47+
TwitterProfileMessage userMsg = new TwitterProfileMessage(msgIn,twitterProfile);
48+
userProfileList.add(userMsg);
49+
}
50+
return userProfileList;
51+
}
52+
}

src/main/java/org/woehlke/twitterwall/scheduled/mq/endoint/impl/StartTaskImpl.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private void messageSender(final TaskType taskType) {
7979
@Scheduled(fixedRate = FIXED_RATE_FOR_SCHEDULAR_FETCH_TWEETS)
8080
public void fetchTweetsFromTwitterSearch() {
8181
TaskType taskType = TaskType.FETCH_TWEETS_FROM_TWITTER_SEARCH;
82-
messageSender(taskType);
82+
sendAndReceive(taskType);
8383
}
8484

8585
@Override
@@ -103,11 +103,32 @@ public void updateUserProfilesFromMentions() {
103103
messageSender(taskType);
104104
}
105105

106+
private void sendAndReceive(TaskType taskType){
107+
Task task = taskService.create("Start via MQ", taskType);
108+
TaskMessage taskMessage = new TaskMessage(task.getId(), taskType, task.getTimeStarted());
109+
Message<TaskMessage> mqMessage = MessageBuilder.withPayload(taskMessage)
110+
.setHeader("task_id", task.getId())
111+
.setHeader("task_uid", task.getUniqueId())
112+
.setHeader("task_type", task.getTaskType())
113+
.build();
114+
MessagingTemplate mqTemplate = new MessagingTemplate();
115+
Message<?> returnedMessage = mqTemplate.sendAndReceive(startTaskChannel, mqMessage);
116+
Object o = returnedMessage.getPayload();
117+
if( o instanceof UserMessage){
118+
UserMessage msg = (UserMessage) o;
119+
long taskId = msg.getTaskId();
120+
task = taskService.findById(taskId);
121+
taskService.done(task);
122+
} else {
123+
taskService.error(task,"Wrong type of returnedMessage");
124+
}
125+
}
126+
106127
@Override
107128
@Scheduled(fixedRate = FIXED_RATE_FOR_SCHEDULAR_FETCH_USER_LIST)
108129
public void fetchUsersFromDefinedUserList() {
109130
TaskType taskType = TaskType.FETCH_USERS_FROM_DEFINED_USER_LIST;
110-
messageSender(taskType);
131+
sendAndReceive(taskType);
111132
}
112133

113134
@Override

src/main/resources/integration.xml

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
</property>
1616
</bean>
1717

18+
<bean id="releaserSequenceSizeReleaseStrategy" class="org.springframework.integration.aggregator.SequenceSizeReleaseStrategy"/>
19+
20+
<bean id="releaserSimpleSequenceSizeReleaseStrategy" class="org.springframework.integration.aggregator.SimpleSequenceSizeReleaseStrategy"/>
1821
<!-- CHANNEL -->
1922

2023
<int:logging-channel-adapter id="logger" level="DEBUG"/>
@@ -104,14 +107,32 @@
104107

105108

106109
<int:chain input-channel="startFetchTweetsFromTwitterSearchChannel">
107-
<int:service-activator ref="mqFetchDataFromRemoteTwitterApi"
108-
method="fetchTweetsFromTwitterSearch" />
110+
<int:splitter id="splitter"
111+
ref="mqFetchTweetsFromTwitterSearch"
112+
method="splitMessage" />
113+
<int:service-activator ref="mqTweetTransformator"
114+
method="transformTweet" />
115+
<int:service-activator ref="mqTweetPersistor"
116+
method="persistTweet" />
117+
<int:aggregator message-store="store"
118+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
119+
<int:service-activator ref="mqTweetFinisher"
120+
method="finish" />
109121
</int:chain>
110122

111123

112124
<int:chain input-channel="startFetchUsersFromDefinedUserListChannel">
113-
<int:service-activator ref="mqFetchDataFromRemoteTwitterApi"
114-
method="fetchUsersFromDefinedUserList" />
125+
<int:splitter id="splitter"
126+
ref="mqFetchUsersFromDefinedUserList"
127+
method="splitMessage" />
128+
<int:service-activator ref="mqTweetTransformator"
129+
method="transformTweet" />
130+
<int:service-activator ref="mqTweetPersistor"
131+
method="persistTweet" />
132+
<int:aggregator message-store="store"
133+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
134+
<int:service-activator ref="mqTweetFinisher"
135+
method="finish" />
115136
</int:chain>
116137

117138
<int:chain input-channel="startFetchTestDataForUserChannel">

0 commit comments

Comments
 (0)