Skip to content

Commit f366b3a

Browse files
working on #141 #142 #143
1 parent a301664 commit f366b3a

File tree

8 files changed

+317
-9
lines changed

8 files changed

+317
-9
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint;
2+
3+
import org.springframework.messaging.Message;
4+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
5+
import org.woehlke.twitterwall.scheduled.mq.msg.TweetFromTwitter;
6+
7+
import java.util.List;
8+
9+
public interface UpdateTweets {
10+
11+
List<TweetFromTwitter> splitMessage(Message<TaskMessage> message);
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint;
2+
3+
import org.springframework.messaging.Message;
4+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
5+
import org.woehlke.twitterwall.scheduled.mq.msg.TwitterProfileMessage;
6+
7+
import java.util.List;
8+
9+
public interface UpdateUserProfiles {
10+
11+
List<TwitterProfileMessage> splitMessage(Message<TaskMessage> message);
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint;
2+
3+
import org.springframework.messaging.Message;
4+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
5+
import org.woehlke.twitterwall.scheduled.mq.msg.TwitterProfileMessage;
6+
7+
import java.util.List;
8+
9+
public interface UpdateUserProfilesFromMentions {
10+
11+
List<TwitterProfileMessage> splitMessage(Message<TaskMessage> message);
12+
}

src/main/java/org/woehlke/twitterwall/scheduled/mq/endoint/impl/StartTaskImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,21 @@ public void fetchTweetsFromTwitterSearch() {
9292
@Scheduled(fixedRate = FIXED_RATE_FOR_SCHEDULAR_UPDATE_TWEETS)
9393
public void updateTweets() {
9494
TaskType taskType = TaskType.UPDATE_TWEETS;
95-
messageSender(taskType);
95+
sendAndReceive(taskType);
9696
}
9797

9898
@Override
9999
@Scheduled(fixedRate = FIXED_RATE_FOR_SCHEDULAR_UPDATE_USER)
100100
public void updateUserProfiles() {
101101
TaskType taskType = TaskType.UPDATE_USER_PROFILES;
102-
messageSender(taskType);
102+
sendAndReceive(taskType);
103103
}
104104

105105
@Override
106106
@Scheduled(fixedRate = FIXED_RATE_FOR_SCHEDULAR_UPDATE_USER_BY_MENTION)
107107
public void updateUserProfilesFromMentions() {
108108
TaskType taskType = TaskType.UPDATE_USER_PROFILES_FROM_MENTIONS;
109-
messageSender(taskType);
109+
sendAndReceive(taskType);
110110
}
111111

112112
private void sendAndReceive(TaskType taskType){
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint.impl;
2+
3+
import org.springframework.data.domain.Page;
4+
import org.springframework.data.domain.PageRequest;
5+
import org.springframework.data.domain.Pageable;
6+
import org.springframework.messaging.Message;
7+
import org.springframework.social.twitter.api.Tweet;
8+
import org.springframework.stereotype.Component;
9+
import org.woehlke.twitterwall.conf.TwitterProperties;
10+
import org.woehlke.twitterwall.oodm.entities.Task;
11+
import org.woehlke.twitterwall.oodm.entities.parts.CountedEntities;
12+
import org.woehlke.twitterwall.oodm.service.TaskService;
13+
import org.woehlke.twitterwall.oodm.service.TweetService;
14+
import org.woehlke.twitterwall.scheduled.mq.endoint.UpdateTweets;
15+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
16+
import org.woehlke.twitterwall.scheduled.mq.msg.TweetFromTwitter;
17+
import org.woehlke.twitterwall.scheduled.service.backend.TwitterApiService;
18+
import org.woehlke.twitterwall.scheduled.service.persist.CountedEntitiesService;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
import static org.woehlke.twitterwall.frontend.controller.common.ControllerHelper.FIRST_PAGE_NUMBER;
24+
25+
@Component("mqUpdateTweets")
26+
public class UpdateTweetsImpl implements UpdateTweets {
27+
28+
private final TwitterProperties twitterProperties;
29+
30+
private final TweetService tweetService;
31+
32+
private final TwitterApiService twitterApiService;
33+
34+
private final TaskService taskService;
35+
36+
private final CountedEntitiesService countedEntitiesService;
37+
38+
public UpdateTweetsImpl(TwitterProperties twitterProperties, TweetService tweetService, TwitterApiService twitterApiService, TaskService taskService, CountedEntitiesService countedEntitiesService) {
39+
this.twitterProperties = twitterProperties;
40+
this.tweetService = tweetService;
41+
this.twitterApiService = twitterApiService;
42+
this.taskService = taskService;
43+
this.countedEntitiesService = countedEntitiesService;
44+
}
45+
46+
@Override
47+
public List<TweetFromTwitter> splitMessage(Message<TaskMessage> message) {
48+
CountedEntities countedEntities = countedEntitiesService.countAll();
49+
List<TweetFromTwitter> tweets = new ArrayList<>();
50+
TaskMessage msgIn = message.getPayload();
51+
long taskId = msgIn.getTaskId();
52+
Task task = taskService.findById(taskId);
53+
task = taskService.start(task,countedEntities);
54+
List<Long> worklistTwitterIds = new ArrayList<>();
55+
boolean hasNext=true;
56+
Pageable pageRequest = new PageRequest(FIRST_PAGE_NUMBER, twitterProperties.getPageSize());
57+
while(hasNext) {
58+
Page<Long> tweetTwitterIds = tweetService.findAllTwitterIds(pageRequest);
59+
hasNext = tweetTwitterIds.hasNext();
60+
worklistTwitterIds.addAll(tweetTwitterIds.getContent());
61+
}
62+
for(Long tweetTwitterId : worklistTwitterIds){
63+
Tweet foundTweetFromTwitter = twitterApiService.findOneTweetById(tweetTwitterId);
64+
TweetFromTwitter result = new TweetFromTwitter(task.getId(),foundTweetFromTwitter);
65+
tweets.add(result);
66+
}
67+
return tweets;
68+
}
69+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint.impl;
2+
3+
import org.springframework.data.domain.Page;
4+
import org.springframework.data.domain.PageRequest;
5+
import org.springframework.data.domain.Pageable;
6+
import org.springframework.messaging.Message;
7+
import org.springframework.social.twitter.api.TwitterProfile;
8+
import org.springframework.stereotype.Component;
9+
import org.woehlke.twitterwall.conf.TwitterProperties;
10+
import org.woehlke.twitterwall.conf.TwitterwallSchedulerProperties;
11+
import org.woehlke.twitterwall.oodm.entities.Mention;
12+
import org.woehlke.twitterwall.oodm.entities.Task;
13+
import org.woehlke.twitterwall.oodm.entities.parts.CountedEntities;
14+
import org.woehlke.twitterwall.oodm.service.MentionService;
15+
import org.woehlke.twitterwall.oodm.service.TaskService;
16+
import org.woehlke.twitterwall.scheduled.mq.endoint.UpdateUserProfilesFromMentions;
17+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
18+
import org.woehlke.twitterwall.scheduled.mq.msg.TwitterProfileMessage;
19+
import org.woehlke.twitterwall.scheduled.service.backend.TwitterApiService;
20+
import org.woehlke.twitterwall.scheduled.service.persist.CountedEntitiesService;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
import static org.woehlke.twitterwall.frontend.controller.common.ControllerHelper.FIRST_PAGE_NUMBER;
26+
27+
@Component("mqUpdateUserProfilesFromMentions")
28+
public class UpdateUserProfilesFromMentionsImpl implements UpdateUserProfilesFromMentions {
29+
30+
private final TwitterwallSchedulerProperties twitterwallSchedulerProperties;
31+
32+
private final TwitterProperties twitterProperties;
33+
34+
private final TwitterApiService twitterApiService;
35+
36+
private final TaskService taskService;
37+
38+
private final MentionService mentionService;
39+
40+
private final CountedEntitiesService countedEntitiesService;
41+
42+
public UpdateUserProfilesFromMentionsImpl(TwitterwallSchedulerProperties twitterwallSchedulerProperties, TwitterProperties twitterProperties, TwitterApiService twitterApiService, TaskService taskService, MentionService mentionService, CountedEntitiesService countedEntitiesService) {
43+
this.twitterwallSchedulerProperties = twitterwallSchedulerProperties;
44+
this.twitterProperties = twitterProperties;
45+
this.twitterApiService = twitterApiService;
46+
this.taskService = taskService;
47+
this.mentionService = mentionService;
48+
this.countedEntitiesService = countedEntitiesService;
49+
}
50+
51+
@Override
52+
public List<TwitterProfileMessage> splitMessage(Message<TaskMessage> message) {
53+
CountedEntities countedEntities = countedEntitiesService.countAll();
54+
List<TwitterProfileMessage> userProfileList = new ArrayList<>();
55+
TaskMessage msgIn = message.getPayload();
56+
long id = msgIn.getTaskId();
57+
Task task = taskService.findById(id);
58+
task = taskService.start(task,countedEntities);
59+
List<String> screenNames = new ArrayList<>();
60+
int allLoop = 0;
61+
int loopId = 0;
62+
boolean hasNext=true;
63+
Pageable pageRequest = new PageRequest(FIRST_PAGE_NUMBER, twitterProperties.getPageSize());
64+
while (hasNext) {
65+
Page<Mention> allPersMentions = mentionService.getAll(pageRequest);
66+
hasNext = allPersMentions.hasNext();
67+
long number = allPersMentions.getTotalElements();
68+
for (Mention onePersMentions : allPersMentions) {
69+
if (!onePersMentions.hasPersistentUser()) {
70+
String screenName = onePersMentions.getScreenName();
71+
screenNames.add(screenName);
72+
}
73+
}
74+
pageRequest = pageRequest.next();
75+
}
76+
for(String screenName:screenNames){
77+
TwitterProfile userProfile = twitterApiService.getUserProfileForScreenName(screenName);
78+
if(userProfile!=null){
79+
TwitterProfileMessage userMsg = new TwitterProfileMessage(msgIn,userProfile);
80+
userProfileList.add(userMsg);
81+
}
82+
}
83+
return userProfileList;
84+
}
85+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endoint.impl;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.data.domain.Page;
7+
import org.springframework.data.domain.PageRequest;
8+
import org.springframework.data.domain.Pageable;
9+
import org.springframework.messaging.Message;
10+
import org.springframework.social.RateLimitExceededException;
11+
import org.springframework.social.twitter.api.TwitterProfile;
12+
import org.springframework.stereotype.Component;
13+
import org.woehlke.twitterwall.conf.TwitterProperties;
14+
import org.woehlke.twitterwall.oodm.entities.Task;
15+
import org.woehlke.twitterwall.oodm.entities.parts.CountedEntities;
16+
import org.woehlke.twitterwall.oodm.service.TaskService;
17+
import org.woehlke.twitterwall.oodm.service.UserService;
18+
import org.woehlke.twitterwall.scheduled.mq.endoint.UpdateUserProfiles;
19+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
20+
import org.woehlke.twitterwall.scheduled.mq.msg.TwitterProfileMessage;
21+
import org.woehlke.twitterwall.scheduled.service.backend.TwitterApiService;
22+
import org.woehlke.twitterwall.scheduled.service.persist.CountedEntitiesService;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
27+
import static org.woehlke.twitterwall.frontend.controller.common.ControllerHelper.FIRST_PAGE_NUMBER;
28+
29+
@Component("mqUpdateUserProfiles")
30+
public class UpdateUserProfilesImpl implements UpdateUserProfiles {
31+
32+
private static final Logger log = LoggerFactory.getLogger(UpdateUserProfilesImpl.class);
33+
34+
private final TwitterProperties twitterProperties;
35+
36+
private final TwitterApiService twitterApiService;
37+
38+
private final TaskService taskService;
39+
40+
private final UserService userService;
41+
42+
private final CountedEntitiesService countedEntitiesService;
43+
44+
@Autowired
45+
public UpdateUserProfilesImpl(TwitterProperties twitterProperties, TwitterApiService twitterApiService, TaskService taskService, UserService userService, CountedEntitiesService countedEntitiesService) {
46+
this.twitterProperties = twitterProperties;
47+
this.twitterApiService = twitterApiService;
48+
this.taskService = taskService;
49+
this.userService = userService;
50+
this.countedEntitiesService = countedEntitiesService;
51+
}
52+
53+
@Override
54+
public List<TwitterProfileMessage> splitMessage(Message<TaskMessage> message) {
55+
String msg = "mqUpdateUserProfiles.splitMessage: ";
56+
CountedEntities countedEntities = countedEntitiesService.countAll();
57+
List<TwitterProfileMessage> userProfileList = new ArrayList<>();
58+
TaskMessage msgIn = message.getPayload();
59+
long id = msgIn.getTaskId();
60+
Task task = taskService.findById(id);
61+
task = taskService.start(task,countedEntities);
62+
int allLoop = 0;
63+
int loopId = 0;
64+
boolean hasNext=true;
65+
List<Long> worklistProfileTwitterIds = new ArrayList<>();
66+
Pageable pageRequest = new PageRequest(FIRST_PAGE_NUMBER, twitterProperties.getPageSize());
67+
while (hasNext) {
68+
Page<Long> userProfileTwitterIds = userService.getAllTwitterIds(pageRequest);
69+
hasNext = userProfileTwitterIds.hasNext();
70+
worklistProfileTwitterIds.addAll(userProfileTwitterIds.getContent());
71+
}
72+
long number = worklistProfileTwitterIds.size();
73+
for(Long userProfileTwitterId:worklistProfileTwitterIds){
74+
allLoop++;
75+
loopId++;
76+
String counter = " ( " + loopId + " from " + number + " ) [" + allLoop + "] ";
77+
log.debug(msg + counter);
78+
TwitterProfile userProfile = null;
79+
try {
80+
userProfile = twitterApiService.getUserProfileForTwitterId(userProfileTwitterId);
81+
} catch (RateLimitExceededException e) {
82+
log.error(msg + counter+ "twitterApiService.getUserProfileForTwitterId("+userProfileTwitterId+") ",e);
83+
}
84+
if(userProfile != null){
85+
TwitterProfileMessage userMsg = new TwitterProfileMessage(msgIn,userProfile);
86+
userProfileList.add(userMsg);
87+
}
88+
}
89+
return userProfileList;
90+
}
91+
}

src/main/resources/integration.xml

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,47 @@
9292
</int:router>
9393

9494
<int:chain input-channel="startUpdateTweetsChannel">
95-
<int:service-activator ref="mqFetchDataFromDatabase"
96-
method="updateTweets" />
95+
<int:splitter id="splitter"
96+
ref="mqUpdateTweets"
97+
method="splitMessage" />
98+
<int:service-activator ref="mqTweetTransformator"
99+
method="transformTweet" />
100+
<int:service-activator ref="mqTweetPersistor"
101+
method="persistTweet" />
102+
<int:aggregator message-store="store"
103+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
104+
<int:service-activator ref="mqTweetFinisher"
105+
method="finish" />
97106
</int:chain>
98107

99108

100109
<int:chain input-channel="startUpdateUserProfilesChannel">
101-
<int:service-activator ref="mqFetchDataFromDatabase"
102-
method="updateUserProfiles" />
110+
<int:splitter id="splitter"
111+
ref="mqUpdateUserProfiles"
112+
method="splitMessage" />
113+
<int:service-activator ref="mqUserTransformator"
114+
method="transformUser" />
115+
<int:service-activator ref="mqUserPersistor"
116+
method="persistUser" />
117+
<int:aggregator message-store="store"
118+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
119+
<int:service-activator ref="mqUserFinisher"
120+
method="finish" />
103121
</int:chain>
104122

105123

106124
<int:chain input-channel="startUpdateUserProfilesFromMentionsChannel">
107-
<int:service-activator ref="mqFetchDataFromDatabase"
108-
method="updateUserProfilesFromMentions" />
125+
<int:splitter id="splitter"
126+
ref="mqUpdateUserProfilesFromMentions"
127+
method="splitMessage" />
128+
<int:service-activator ref="mqUserTransformator"
129+
method="transformUser" />
130+
<int:service-activator ref="mqUserPersistor"
131+
method="persistUser" />
132+
<int:aggregator message-store="store"
133+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
134+
<int:service-activator ref="mqUserFinisher"
135+
method="finish" />
109136
</int:chain>
110137

111138

0 commit comments

Comments
 (0)