Skip to content

Commit 02ff51b

Browse files
committed
Ensure that events of same file are processed in sequence and not in parallel.
1 parent d1dd8e9 commit 02ff51b

File tree

6 files changed

+221
-17
lines changed

6 files changed

+221
-17
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
/obj
44
/packages
55
/*.bak
6+
/*.cs~

AsyncToSyncCodeRoundtripSynchroniserMonitorNet.csproj

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
77
<ProjectGuid>{7D900E57-16D0-4671-AE44-FBC9FE3CD639}</ProjectGuid>
88
<OutputType>Exe</OutputType>
9-
<RootNamespace>AsyncToSyncCodeRoundtripSynchroniserMonitorNet</RootNamespace>
9+
<RootNamespace>AsyncToSyncCodeRoundtripSynchroniserMonitor</RootNamespace>
1010
<AssemblyName>AsyncToSyncCodeRoundtripSynchroniserMonitor</AssemblyName>
1111
<TargetFrameworkVersion>v4.6</TargetFrameworkVersion>
1212
<FileAlignment>512</FileAlignment>
@@ -23,7 +23,7 @@
2323
<DefineConstants>DEBUG;TRACE</DefineConstants>
2424
<ErrorReport>prompt</ErrorReport>
2525
<WarningLevel>4</WarningLevel>
26-
<NoWarn>SEC0122;S125;S1125;S1135;S2589;S3881;S3358;CA1063;CCN0011;CCN0021;CCN0031;1701;1702;AsyncFixed01;MS002;IDE0018;AD0001</NoWarn>
26+
<NoWarn>SEC0122;S125;S1125;S1135;S2589;S3881;S3358;CA1063;CCN0011;CCN0021;CCN0031;1701;1702;AsyncFixed01;MS002;MS003;IDE0018;AD0001;SEC0112</NoWarn>
2727
<WarningsAsErrors>NU1605</WarningsAsErrors>
2828
</PropertyGroup>
2929
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
@@ -34,7 +34,7 @@
3434
<DefineConstants>TRACE</DefineConstants>
3535
<ErrorReport>prompt</ErrorReport>
3636
<WarningLevel>4</WarningLevel>
37-
<NoWarn>SEC0122;S125;S1125;S1135;S2589;S3881;S3358;CA1063;CCN0011;CCN0021;CCN0031;1701;1702;AsyncFixed01;MS002;IDE0018;AD0001</NoWarn>
37+
<NoWarn>SEC0122;S125;S1125;S1135;S2589;S3881;S3358;CA1063;CCN0011;CCN0021;CCN0031;1701;1702;AsyncFixed01;MS002;MS003;IDE0018;AD0001;SEC0112</NoWarn>
3838
<WarningsAsErrors>NU1605</WarningsAsErrors>
3939
<DebugSymbols>true</DebugSymbols>
4040
</PropertyGroup>
@@ -102,6 +102,7 @@
102102
<HintPath>packages\System.Console.4.3.0\lib\net46\System.Console.dll</HintPath>
103103
</Reference>
104104
<Reference Include="System.Core" />
105+
<Reference Include="System.Data.Linq" />
105106
<Reference Include="System.Diagnostics.DiagnosticSource, Version=4.0.1.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
106107
<HintPath>packages\System.Diagnostics.DiagnosticSource.4.3.0\lib\net46\System.Diagnostics.DiagnosticSource.dll</HintPath>
107108
</Reference>
@@ -150,10 +151,12 @@
150151
</ItemGroup>
151152
<ItemGroup>
152153
<Compile Include="AsyncToSyncConverter.cs" />
154+
<Compile Include="BinaryFileExtensions.cs" />
153155
<Compile Include="Extensions.cs" />
154156
<Compile Include="FileExtensions.cs" />
155157
<Compile Include="Program.cs" />
156158
<Compile Include="Properties\AssemblyInfo.cs" />
159+
<Compile Include="Synchronisation.cs" />
157160
<Compile Include="SyncToAsyncConverter.cs" />
158161
</ItemGroup>
159162
<ItemGroup>

BinaryFileExtensions.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// Copyright (c) Roland Pihlakas 2019 - 2020
3+
// roland@simplify.ee
4+
//
5+
// Roland Pihlakas licenses this file to you under the GNU Lesser General Public License, ver 2.1.
6+
// See the LICENSE file for more information.
7+
//
8+
9+
using System.Data.Linq;
10+
using System.IO;
11+
using System.Threading;
12+
using System.Threading.Tasks;
13+
14+
namespace AsyncToSyncCodeRoundtripSynchroniserMonitor
15+
{
16+
public static partial class FileExtensions
17+
{
18+
//https://stackoverflow.com/questions/18472867/checking-equality-for-two-byte-arrays/
19+
public static bool BinaryEqual(Binary a1, Binary b1)
20+
{
21+
return a1.Equals(b1);
22+
}
23+
24+
public static async Task<byte[]> ReadAllBytesAsync(string path, CancellationToken cancellationToken = default(CancellationToken))
25+
{
26+
using (FileStream stream = new FileStream(
27+
path,
28+
FileMode.Open,
29+
FileAccess.Read,
30+
FileShare.ReadWrite,
31+
bufferSize: 1024 * 1024,
32+
useAsync: true
33+
))
34+
{
35+
byte[] result = new byte[stream.Length];
36+
await stream.ReadAsync(result, 0, (int)stream.Length, cancellationToken);
37+
return result;
38+
}
39+
}
40+
41+
public static async Task WriteAllBytesAsync(string path, byte[] contents, CancellationToken cancellationToken = default(CancellationToken))
42+
{
43+
using (FileStream stream = new FileStream(
44+
path,
45+
FileMode.OpenOrCreate,
46+
FileAccess.Write,
47+
FileShare.Read,
48+
bufferSize: 1024 * 1024,
49+
useAsync: true
50+
))
51+
{
52+
await stream.WriteAsync(contents, 0, contents.Length, cancellationToken);
53+
}
54+
}
55+
56+
}
57+
}

FileExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
namespace AsyncToSyncCodeRoundtripSynchroniserMonitor
2020
{
21-
public static class FileExtensions
21+
public static partial class FileExtensions
2222
{
2323
//adapted from https://github.com/dotnet/runtime/blob/5ddc873d9ea6cd4bc6a935fec3057fe89a6932aa/src/libraries/System.IO.FileSystem/src/System/IO/File.cs
2424

Program.cs

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ internal class ConsoleWatch
322322
public static bool DoingInitialSync = false;
323323
#pragma warning restore S2223
324324

325+
private static readonly AsyncLockQueueDictionary FileLocks = new AsyncLockQueueDictionary();
326+
325327

326328
public ConsoleWatch(IWatcher3 watch)
327329
{
@@ -468,7 +470,7 @@ public static async Task FileUpdated(string fullName, Context context)
468470
}
469471
else //Assume ResX file
470472
{
471-
var fileData = await FileExtensions.ReadAllTextAsync(fullName, context.Token);
473+
var fileData = await FileExtensions.ReadAllBytesAsync(fullName, context.Token);
472474
var originalData = fileData;
473475

474476
//save without transformations
@@ -567,8 +569,12 @@ private static async Task OnRenamedAsync(IRenamedFileSystemEvent rfse, Cancellat
567569
//NB! if file is renamed to cs~ or resx~ then that means there will be yet another write to same file, so lets skip this event here
568570
if (!rfse.FileSystemInfo.FullName.EndsWith("~"))
569571
{
570-
await FileUpdated(rfse.FileSystemInfo.FullName, context);
571-
await FileDeleted(rfse.PreviousFileSystemInfo.FullName, context);
572+
using (await FileLocks.LockAsync(rfse.FileSystemInfo.FullName, token))
573+
using (await FileLocks.LockAsync(rfse.PreviousFileSystemInfo.FullName, token))
574+
{
575+
await FileUpdated(rfse.FileSystemInfo.FullName, context);
576+
await FileDeleted(rfse.PreviousFileSystemInfo.FullName, context);
577+
}
572578
}
573579
}
574580
}
@@ -596,9 +602,12 @@ private static async Task OnRemovedAsync(IFileSystemEvent fse, CancellationToken
596602
if (IsWatchedFile(fse.FileSystemInfo.FullName))
597603
{
598604
await AddMessage(ConsoleColor.Yellow, $"[{(fse.IsFile ? "F" : "D")}][-]:{fse.FileSystemInfo.FullName}", context);
599-
}
600605

601-
await FileDeleted(fse.FileSystemInfo.FullName, context);
606+
using (await FileLocks.LockAsync(fse.FileSystemInfo.FullName, token))
607+
{
608+
await FileDeleted(fse.FileSystemInfo.FullName, context);
609+
}
610+
}
602611
}
603612
else
604613
{
@@ -619,12 +628,15 @@ public static async Task OnAddedAsync(IFileSystemEvent fse, CancellationToken to
619628
{
620629
if (fse.IsFile)
621630
{
622-
//if (IsWatchedFile(fse.FileSystemInfo.FullName))
623-
//{
624-
// await AddMessage(ConsoleColor.Green, $"[{(fse.IsFile ? "F" : "D")}][+]:{fse.FileSystemInfo.FullName}", context);
625-
//}
631+
if (IsWatchedFile(fse.FileSystemInfo.FullName))
632+
{
633+
//await AddMessage(ConsoleColor.Green, $"[{(fse.IsFile ? "F" : "D")}][+]:{fse.FileSystemInfo.FullName}", context);
626634

627-
await FileUpdated(fse.FileSystemInfo.FullName, context);
635+
using (await FileLocks.LockAsync(fse.FileSystemInfo.FullName, token))
636+
{
637+
await FileUpdated(fse.FileSystemInfo.FullName, context);
638+
}
639+
}
628640
}
629641
else
630642
{
@@ -648,9 +660,12 @@ private static async Task OnTouchedAsync(IFileSystemEvent fse, CancellationToken
648660
if (IsWatchedFile(fse.FileSystemInfo.FullName))
649661
{
650662
await AddMessage(ConsoleColor.Gray, $"[{(fse.IsFile ? "F" : "D")}][T]:{fse.FileSystemInfo.FullName}", context);
651-
}
652663

653-
await FileUpdated(fse.FileSystemInfo.FullName, context);
664+
using (await FileLocks.LockAsync(fse.FileSystemInfo.FullName, token))
665+
{
666+
await FileUpdated(fse.FileSystemInfo.FullName, context);
667+
}
668+
}
654669
}
655670
else
656671
{
@@ -710,7 +725,10 @@ public static async Task SaveFileModifications(string fullName, string fileData,
710725
? await FileExtensions.ReadAllTextAsync(otherFullName, context.Token)
711726
: null;
712727

713-
if ((otherFileData?.Length ?? -1) != fileData.Length || otherFileData != fileData)
728+
if (
729+
(otherFileData?.Length ?? -1) != fileData.Length
730+
|| otherFileData != fileData
731+
)
714732
{
715733
await DeleteFile(otherFullName, context);
716734

@@ -722,6 +740,51 @@ public static async Task SaveFileModifications(string fullName, string fileData,
722740
Global.ConverterSavedFileDates[otherFullName] = now;
723741

724742

743+
await AddMessage(ConsoleColor.Magenta, $"Synchronised updates from file {fullName}", context);
744+
}
745+
else if (false)
746+
{
747+
//touch the file
748+
var now = DateTime.UtcNow; //NB! compute common now for ConverterSavedFileDates
749+
750+
try
751+
{
752+
File.SetLastWriteTimeUtc(otherFullName, now);
753+
}
754+
catch (Exception ex)
755+
{
756+
await ConsoleWatch.WriteException(ex, context);
757+
}
758+
759+
Global.ConverterSavedFileDates[otherFullName] = now;
760+
}
761+
}
762+
763+
public static async Task SaveFileModifications(string fullName, byte[] fileData, byte[] originalData, Context context)
764+
{
765+
var otherFullName = GetOtherFullName(fullName);
766+
767+
768+
//NB! detect whether the file actually changed
769+
var otherFileData = File.Exists(otherFullName)
770+
? await FileExtensions.ReadAllBytesAsync(otherFullName, context.Token)
771+
: null;
772+
773+
if (
774+
(otherFileData?.Length ?? -1) != fileData.Length
775+
|| !FileExtensions.BinaryEqual(otherFileData, fileData)
776+
)
777+
{
778+
await DeleteFile(otherFullName, context);
779+
780+
Directory.CreateDirectory(Path.GetDirectoryName(otherFullName));
781+
782+
await FileExtensions.WriteAllBytesAsync(otherFullName, fileData, context.Token);
783+
784+
var now = DateTime.UtcNow; //NB! compute now after saving the file
785+
Global.ConverterSavedFileDates[otherFullName] = now;
786+
787+
725788
await AddMessage(ConsoleColor.Magenta, $"Synchronised updates from file {fullName}", context);
726789
}
727790
else if (false)

Synchronisation.cs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Nito.AsyncEx;
6+
7+
namespace AsyncToSyncCodeRoundtripSynchroniserMonitor
8+
{
9+
public class AsyncLockQueueDictionary
10+
{
11+
private readonly object DictionaryAccessMutex = new object();
12+
private readonly Dictionary<string, AsyncLockWithCount> LockQueueDictionary = new Dictionary<string, AsyncLockWithCount>();
13+
14+
public sealed class AsyncLockWithCount
15+
{
16+
public readonly AsyncLock LockEntry;
17+
public int WaiterCount;
18+
19+
public AsyncLockWithCount()
20+
{
21+
this.LockEntry = new AsyncLock();
22+
this.WaiterCount = 1;
23+
}
24+
}
25+
26+
public sealed class LockDictReleaser : IDisposable
27+
{
28+
public readonly string Name;
29+
public readonly AsyncLockWithCount LockEntry;
30+
public readonly IDisposable LockHandle;
31+
public readonly AsyncLockQueueDictionary AsyncLockQueueDictionary;
32+
33+
internal LockDictReleaser(string name, AsyncLockWithCount lockEntry, IDisposable lockHandle, AsyncLockQueueDictionary asyncLockQueueDictionary)
34+
{
35+
this.Name = name;
36+
this.LockEntry = lockEntry;
37+
this.LockHandle = lockHandle;
38+
this.AsyncLockQueueDictionary = asyncLockQueueDictionary;
39+
}
40+
41+
public void Dispose()
42+
{
43+
this.AsyncLockQueueDictionary.ReleaseLock(this.Name, this.LockEntry, this.LockHandle);
44+
}
45+
}
46+
47+
private void ReleaseLock(string name, AsyncLockWithCount lockEntry, IDisposable lockHandle)
48+
{
49+
lockHandle.Dispose();
50+
51+
lock (DictionaryAccessMutex)
52+
{
53+
if (lockEntry.WaiterCount == 0) //NB!
54+
{
55+
LockQueueDictionary.Remove(name);
56+
}
57+
}
58+
}
59+
60+
public async Task<LockDictReleaser> LockAsync(string name, CancellationToken cancellationToken = default(CancellationToken))
61+
{
62+
AsyncLockWithCount lockEntry;
63+
lock (DictionaryAccessMutex)
64+
{
65+
if (!LockQueueDictionary.TryGetValue(name, out lockEntry))
66+
{
67+
lockEntry = new AsyncLockWithCount();
68+
LockQueueDictionary.Add(name, lockEntry);
69+
}
70+
else
71+
{
72+
lockEntry.WaiterCount++; //NB! must be done inside the lock and BEFORE waiting for the lock
73+
}
74+
}
75+
76+
var lockHandle = await lockEntry.LockEntry.LockAsync(cancellationToken);
77+
return new LockDictReleaser(name, lockEntry, lockHandle, this);
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)