@@ -19,6 +19,7 @@ class SyncingService {
1919 onDownloadError;
2020 final Future <bool > Function (Attachment attachment, Object exception)?
2121 onUploadError;
22+ bool isProcessing = false ;
2223
2324 SyncingService (this .db, this .remoteStorage, this .localStorage,
2425 this .attachmentsService, this .getLocalUri,
@@ -103,139 +104,79 @@ class SyncingService {
103104 }
104105 }
105106
106- /// Function to manually run downloads for attachments marked for download
107- /// in the attachment queue.
108- /// Once a an attachment marked for download is found it will initiate a
109- /// download of the file to local storage.
110- StreamSubscription <void > watchDownloads () {
111- log.info ('Watching downloads...' );
112- return db.watch ('''
113- SELECT * FROM ${attachmentsService .table }
114- WHERE state = ${AttachmentState .queuedDownload .index }
115- ''' ).map ((results) {
116- return results.map ((row) => Attachment .fromRow (row));
117- }).listen ((attachments) async {
118- for (Attachment attachment in attachments) {
119- log.info ('Downloading ${attachment .filename }' );
120- await downloadAttachment (attachment);
121- }
122- });
123- }
107+ /// Handle downloading, uploading or deleting of attachments
108+ Future <void > handleSync (Iterable <Attachment > attachments) async {
109+ if (isProcessing == true ) {
110+ return ;
111+ }
124112
125- /// Watcher for attachments marked for download in the attachment queue.
126- /// Once a an attachment marked for download is found it will initiate a
127- /// download of the file to local storage.
128- Future <void > runDownloads () async {
129- List <Attachment > attachments = await db.execute ('''
130- SELECT * FROM ${attachmentsService .table }
131- WHERE state = ${AttachmentState .queuedDownload .index }
132- ''' ).then ((results) {
133- return results.map ((row) => Attachment .fromRow (row)).toList ();
134- });
113+ isProcessing = true ;
135114
136115 for (Attachment attachment in attachments) {
137- log.info ('Downloading ${attachment .filename }' );
138- await downloadAttachment (attachment);
139- }
140- }
141-
142- /// Watcher for attachments marked for upload in the attachment queue.
143- /// Once a an attachment marked for upload is found it will initiate an
144- /// upload of the file to remote storage.
145- StreamSubscription <void > watchUploads () {
146- log.info ('Watching uploads...' );
147- return db.watch ('''
148- SELECT * FROM ${attachmentsService .table }
149- WHERE local_uri IS NOT NULL
150- AND state = ${AttachmentState .queuedUpload .index }
151- ''' ).map ((results) {
152- return results.map ((row) => Attachment .fromRow (row));
153- }).listen ((attachments) async {
154- for (Attachment attachment in attachments) {
116+ if (AttachmentState .queuedDownload.index == attachment.state) {
117+ log.info ('Downloading ${attachment .filename }' );
118+ await downloadAttachment (attachment);
119+ }
120+ if (AttachmentState .queuedUpload.index == attachment.state) {
155121 log.info ('Uploading ${attachment .filename }' );
156122 await uploadAttachment (attachment);
157123 }
158- });
159- }
160-
161- /// Function to manually run uploads for attachments marked for upload
162- /// in the attachment queue.
163- /// Once a an attachment marked for deletion is found it will initiate an
164- /// upload of the file to remote storage
165- Future <void > runUploads () async {
166- List <Attachment > attachments = await db.execute ('''
167- SELECT * FROM ${attachmentsService .table }
168- WHERE local_uri IS NOT NULL
169- AND state = ${AttachmentState .queuedUpload .index }
170- ''' ).then ((results) {
171- return results.map ((row) => Attachment .fromRow (row)).toList ();
172- });
173-
174- for (Attachment attachment in attachments) {
175- log.info ('Uploading ${attachment .filename }' );
176- await uploadAttachment (attachment);
124+ if (AttachmentState .queuedDelete.index == attachment.state) {
125+ log.info ('Deleting ${attachment .filename }' );
126+ await deleteAttachment (attachment);
127+ }
177128 }
129+
130+ isProcessing = false ;
178131 }
179132
180- /// Watcher for attachments marked for deletion in the attachment queue.
181- /// Once a an attachment marked for deletion is found it will initiate remote
182- /// and local deletions of the file.
183- StreamSubscription <void > watchDeletes () {
184- log.info ('Watching deletes...' );
133+ /// Watcher for changes to attachments table
134+ /// Once a change is detected it will initiate a sync of the attachments
135+ StreamSubscription <void > watchAttachments () {
136+ log.info ('Watching attachments...' );
185137 return db.watch ('''
186138 SELECT * FROM ${attachmentsService .table }
187- WHERE state = ${AttachmentState .queuedDelete .index }
139+ WHERE state ! = ${AttachmentState .archived .index }
188140 ''' ).map ((results) {
189141 return results.map ((row) => Attachment .fromRow (row));
190142 }).listen ((attachments) async {
191- for (Attachment attachment in attachments) {
192- log.info ('Deleting ${attachment .filename }' );
193- await deleteAttachment (attachment);
194- }
143+ await handleSync (attachments);
195144 });
196145 }
197146
198- /// Function to manually run deletes for attachments marked for deletion
199- /// in the attachment queue.
200- /// Once a an attachment marked for deletion is found it will initiate remote
201- /// and local deletions of the file.
202- Future <void > runDeletes () async {
147+ /// Run the sync process on all attachments
148+ Future <void > runSync () async {
203149 List <Attachment > attachments = await db.execute ('''
204150 SELECT * FROM ${attachmentsService .table }
205- WHERE state = ${AttachmentState .queuedDelete .index }
151+ WHERE state ! = ${AttachmentState .archived .index }
206152 ''' ).then ((results) {
207153 return results.map ((row) => Attachment .fromRow (row)).toList ();
208154 });
209155
210- for (Attachment attachment in attachments) {
211- log.info ('Deleting ${attachment .filename }' );
212- await deleteAttachment (attachment);
213- }
156+ await handleSync (attachments);
214157 }
215158
216- /// Reconcile an ID with ID's in the attachment queue.
217- /// If the ID is not in the queue, but the file exists locally then it is
218- /// in local and remote storage.
219- /// If the ID is in the queue, but the file does not exist locally then it is
220- /// marked for download.
221- reconcileId (String id, List <String > idsInQueue, String fileExtension) async {
222- bool idIsInQueue = idsInQueue.contains (id);
159+ /// Process ID's to be included in the attachment queue.
160+ processIds (List <String > ids, String fileExtension) async {
161+ List <Attachment > attachments = List .empty (growable: true );
223162
224- String path = await getLocalUri ('$id .$fileExtension ' );
225- File file = File (path);
226- bool fileExists = await file.exists ();
163+ for (String id in ids) {
164+ String path = await getLocalUri ('$id .$fileExtension ' );
165+ File file = File (path);
166+ bool fileExists = await file.exists ();
227167
228- if (! idIsInQueue) {
229168 if (fileExists) {
230169 log.info ('ignore file $id .$fileExtension as it already exists' );
231170 return ;
232171 }
172+
233173 log.info ('Adding $id to queue' );
234- return await attachmentsService.saveAttachment (Attachment (
235- id: id,
236- filename: '$id .$fileExtension ' ,
237- state: AttachmentState .queuedDownload.index,
238- ));
174+ attachments.add (Attachment (
175+ id: id,
176+ filename: '$id .$fileExtension ' ,
177+ state: AttachmentState .queuedDownload.index));
239178 }
179+
180+ await attachmentsService.saveAttachments (attachments);
240181 }
241182}
0 commit comments