2727import com .uber .cadence .internal .worker .DecisionTaskHandler ;
2828import com .uber .cadence .internal .worker .LocalActivityWorker ;
2929import com .uber .cadence .internal .worker .LocallyDispatchedActivityWorker ;
30+ import com .uber .cadence .internal .worker .LocallyDispatchedActivityWorker .Task ;
3031import com .uber .cadence .internal .worker .SingleWorkerOptions ;
3132import com .uber .cadence .internal .worker .SuspendableWorker ;
3233import com .uber .cadence .internal .worker .WorkflowWorker ;
4445import java .util .function .Consumer ;
4546import java .util .function .Function ;
4647
47- /** Workflow worker that supports POJO workflow implementations. */
48+ /**
49+ * Workflow worker that supports POJO workflow implementations.
50+ */
4851public class SyncWorkflowWorker
4952 implements SuspendableWorker , Consumer <PollForDecisionTaskResponse > {
5053
5154 private final WorkflowWorker workflowWorker ;
5255 private final LocalActivityWorker laWorker ;
53- private final LocallyDispatchedActivityWorker ldaWorker ;
54-
5556 private final POJOWorkflowImplementationFactory factory ;
5657 private final DataConverter dataConverter ;
5758 private final POJOActivityTaskHandler laTaskHandler ;
58- private final POJOActivityTaskHandler ldaTaskHandler ;
5959 private final ScheduledExecutorService heartbeatExecutor = Executors .newScheduledThreadPool (4 );
6060 private final ScheduledExecutorService ldaHeartbeatExecutor = Executors .newScheduledThreadPool (4 );
61+ private LocallyDispatchedActivityWorker ldaWorker ;
62+ private POJOActivityTaskHandler ldaTaskHandler ;
6163
6264 public SyncWorkflowWorker (
6365 IWorkflowService service ,
@@ -97,15 +99,21 @@ public SyncWorkflowWorker(
9799 stickyDecisionScheduleToStartTimeout ,
98100 service ,
99101 laWorker .getLocalActivityTaskPoller ());
100- ldaTaskHandler =
101- new POJOActivityTaskHandler (
102- service ,
103- domain ,
104- locallyDispatchedActivityOptions .getDataConverter (),
105- ldaHeartbeatExecutor );
106- ldaWorker =
107- new LocallyDispatchedActivityWorker (
108- service , domain , taskList , locallyDispatchedActivityOptions , ldaTaskHandler );
102+
103+ Function <Task , Boolean > locallyDispatchedActivityTaskPoller = null ;
104+ // do not dispatch locally if TaskListActivitiesPerSecond is set
105+ if (locallyDispatchedActivityOptions .getTaskListActivitiesPerSecond () == 0 ) {
106+ ldaTaskHandler =
107+ new POJOActivityTaskHandler (
108+ service ,
109+ domain ,
110+ locallyDispatchedActivityOptions .getDataConverter (),
111+ ldaHeartbeatExecutor );
112+ ldaWorker =
113+ new LocallyDispatchedActivityWorker (
114+ service , domain , taskList , locallyDispatchedActivityOptions , ldaTaskHandler );
115+ locallyDispatchedActivityTaskPoller = ldaWorker .getLocallyDispatchedActivityTaskPoller ();
116+ }
109117
110118 workflowWorker =
111119 new WorkflowWorker (
@@ -114,7 +122,7 @@ public SyncWorkflowWorker(
114122 taskList ,
115123 workflowOptions ,
116124 taskHandler ,
117- ldaWorker . getLocallyDispatchedActivityTaskPoller () ,
125+ locallyDispatchedActivityTaskPoller ,
118126 stickyTaskListName );
119127 }
120128
@@ -137,7 +145,9 @@ public void setLocalActivitiesImplementation(Object... activitiesImplementation)
137145 }
138146
139147 public void setActivitiesImplementationToDispatchLocally (Object ... activitiesImplementation ) {
140- this .ldaTaskHandler .setActivitiesImplementation (activitiesImplementation );
148+ if (this .ldaTaskHandler != null ) {
149+ this .ldaTaskHandler .setActivitiesImplementation (activitiesImplementation );
150+ }
141151 }
142152
143153 @ Override
@@ -147,69 +157,84 @@ public void start() {
147157 // to start LocalActivity Worker.
148158 if (workflowWorker .isStarted ()) {
149159 laWorker .start ();
150- ldaWorker .start ();
160+ if (ldaWorker != null ) {
161+ ldaWorker .start ();
162+ }
151163 }
152164 }
153165
154166 @ Override
155167 public boolean isStarted () {
156- return workflowWorker .isStarted () && laWorker .isStarted () && ldaWorker .isStarted ();
168+ return workflowWorker .isStarted () && laWorker .isStarted () && (ldaWorker == null || ldaWorker
169+ .isStarted ());
157170 }
158171
159172 @ Override
160173 public boolean isShutdown () {
161- return workflowWorker .isShutdown () && laWorker .isShutdown () && ldaWorker .isShutdown ();
174+ return workflowWorker .isShutdown () && laWorker .isShutdown () && (ldaWorker == null || ldaWorker
175+ .isShutdown ());
162176 }
163177
164178 @ Override
165179 public boolean isTerminated () {
166180 return workflowWorker .isTerminated ()
167181 && laWorker .isTerminated ()
168182 && ldaHeartbeatExecutor .isTerminated ()
169- && ldaWorker .isTerminated ();
183+ && ( ldaWorker == null || ldaWorker .isTerminated () );
170184 }
171185
172186 @ Override
173187 public void shutdown () {
174188 laWorker .shutdown ();
175189 ldaHeartbeatExecutor .shutdown ();
176- ldaWorker .shutdown ();
190+ if (ldaWorker != null ) {
191+ ldaWorker .shutdown ();
192+ }
177193 workflowWorker .shutdown ();
178194 }
179195
180196 @ Override
181197 public void shutdownNow () {
182198 laWorker .shutdownNow ();
183199 ldaHeartbeatExecutor .shutdownNow ();
184- ldaWorker .shutdownNow ();
200+ if (ldaWorker != null ) {
201+ ldaWorker .shutdownNow ();
202+ }
185203 workflowWorker .shutdownNow ();
186204 }
187205
188206 @ Override
189207 public void awaitTermination (long timeout , TimeUnit unit ) {
190208 long timeoutMillis = InternalUtils .awaitTermination (laWorker , unit .toMillis (timeout ));
191209 timeoutMillis = InternalUtils .awaitTermination (ldaHeartbeatExecutor , timeoutMillis );
192- timeoutMillis = InternalUtils .awaitTermination (ldaWorker , timeoutMillis );
210+ if (ldaWorker != null ) {
211+ timeoutMillis = InternalUtils .awaitTermination (ldaWorker , timeoutMillis );
212+ }
193213 InternalUtils .awaitTermination (workflowWorker , timeoutMillis );
194214 }
195215
196216 @ Override
197217 public void suspendPolling () {
198218 workflowWorker .suspendPolling ();
199219 laWorker .suspendPolling ();
200- ldaWorker .suspendPolling ();
220+ if (ldaWorker != null ) {
221+ ldaWorker .suspendPolling ();
222+ }
201223 }
202224
203225 @ Override
204226 public void resumePolling () {
205227 workflowWorker .resumePolling ();
206228 laWorker .resumePolling ();
207- ldaWorker .resumePolling ();
229+ if (ldaWorker != null ) {
230+ ldaWorker .resumePolling ();
231+ }
208232 }
209233
210234 @ Override
211235 public boolean isSuspended () {
212- return workflowWorker .isSuspended () && laWorker .isSuspended () && ldaWorker .isSuspended ();
236+ return workflowWorker .isSuspended () && laWorker .isSuspended () && (ldaWorker == null || ldaWorker
237+ .isSuspended ());
213238 }
214239
215240 public <R > R queryWorkflowExecution (
0 commit comments