644 lines
20 KiB
C#
644 lines
20 KiB
C#
using System;
|
|
using System.Threading;
|
|
|
|
namespace Cysharp.Threading.Tasks
|
|
{
|
|
public interface IReadOnlyAsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T>
|
|
{
|
|
T Value { get; }
|
|
IUniTaskAsyncEnumerable<T> WithoutCurrent();
|
|
UniTask<T> WaitAsync(CancellationToken cancellationToken = default);
|
|
}
|
|
|
|
public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
|
|
{
|
|
new T Value { get; set; }
|
|
}
|
|
|
|
[Serializable]
|
|
public class AsyncReactiveProperty<T> : IAsyncReactiveProperty<T>, IDisposable
|
|
{
|
|
TriggerEvent<T> triggerEvent;
|
|
|
|
#if UNITY_2018_3_OR_NEWER
|
|
[UnityEngine.SerializeField]
|
|
#endif
|
|
T latestValue;
|
|
|
|
public T Value
|
|
{
|
|
get
|
|
{
|
|
return latestValue;
|
|
}
|
|
set
|
|
{
|
|
this.latestValue = value;
|
|
triggerEvent.SetResult(value);
|
|
}
|
|
}
|
|
|
|
public AsyncReactiveProperty(T value)
|
|
{
|
|
this.latestValue = value;
|
|
this.triggerEvent = default;
|
|
}
|
|
|
|
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
|
|
{
|
|
return new WithoutCurrentEnumerable(this);
|
|
}
|
|
|
|
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
|
|
{
|
|
return new Enumerator(this, cancellationToken, true);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
triggerEvent.SetCompleted();
|
|
}
|
|
|
|
public static implicit operator T(AsyncReactiveProperty<T> value)
|
|
{
|
|
return value.Value;
|
|
}
|
|
|
|
public override string ToString()
|
|
{
|
|
if (isValueType) return latestValue.ToString();
|
|
return latestValue?.ToString();
|
|
}
|
|
|
|
public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
|
|
}
|
|
|
|
static bool isValueType;
|
|
|
|
static AsyncReactiveProperty()
|
|
{
|
|
isValueType = typeof(T).IsValueType;
|
|
}
|
|
|
|
sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
|
|
{
|
|
static Action<object> cancellationCallback = CancellationCallback;
|
|
|
|
static TaskPool<WaitAsyncSource> pool;
|
|
WaitAsyncSource nextNode;
|
|
ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
|
|
|
|
static WaitAsyncSource()
|
|
{
|
|
TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
|
|
}
|
|
|
|
AsyncReactiveProperty<T> parent;
|
|
CancellationToken cancellationToken;
|
|
CancellationTokenRegistration cancellationTokenRegistration;
|
|
UniTaskCompletionSourceCore<T> core;
|
|
|
|
WaitAsyncSource()
|
|
{
|
|
}
|
|
|
|
public static IUniTaskSource<T> Create(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
{
|
|
return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
|
|
}
|
|
|
|
if (!pool.TryPop(out var result))
|
|
{
|
|
result = new WaitAsyncSource();
|
|
}
|
|
|
|
result.parent = parent;
|
|
result.cancellationToken = cancellationToken;
|
|
|
|
if (cancellationToken.CanBeCanceled)
|
|
{
|
|
result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
|
|
}
|
|
|
|
result.parent.triggerEvent.Add(result);
|
|
|
|
TaskTracker.TrackActiveTask(result, 3);
|
|
|
|
token = result.core.Version;
|
|
return result;
|
|
}
|
|
|
|
bool TryReturn()
|
|
{
|
|
TaskTracker.RemoveTracking(this);
|
|
core.Reset();
|
|
cancellationTokenRegistration.Dispose();
|
|
cancellationTokenRegistration = default;
|
|
parent.triggerEvent.Remove(this);
|
|
parent = null;
|
|
cancellationToken = default;
|
|
return pool.TryPush(this);
|
|
}
|
|
|
|
static void CancellationCallback(object state)
|
|
{
|
|
var self = (WaitAsyncSource)state;
|
|
self.OnCanceled(self.cancellationToken);
|
|
}
|
|
|
|
// IUniTaskSource
|
|
|
|
public T GetResult(short token)
|
|
{
|
|
try
|
|
{
|
|
return core.GetResult(token);
|
|
}
|
|
finally
|
|
{
|
|
TryReturn();
|
|
}
|
|
}
|
|
|
|
void IUniTaskSource.GetResult(short token)
|
|
{
|
|
GetResult(token);
|
|
}
|
|
|
|
public void OnCompleted(Action<object> continuation, object state, short token)
|
|
{
|
|
core.OnCompleted(continuation, state, token);
|
|
}
|
|
|
|
public UniTaskStatus GetStatus(short token)
|
|
{
|
|
return core.GetStatus(token);
|
|
}
|
|
|
|
public UniTaskStatus UnsafeGetStatus()
|
|
{
|
|
return core.UnsafeGetStatus();
|
|
}
|
|
|
|
// ITriggerHandler
|
|
|
|
ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
|
|
ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
|
|
|
|
public void OnCanceled(CancellationToken cancellationToken)
|
|
{
|
|
core.TrySetCanceled(cancellationToken);
|
|
}
|
|
|
|
public void OnCompleted()
|
|
{
|
|
// Complete as Cancel.
|
|
core.TrySetCanceled(CancellationToken.None);
|
|
}
|
|
|
|
public void OnError(Exception ex)
|
|
{
|
|
core.TrySetException(ex);
|
|
}
|
|
|
|
public void OnNext(T value)
|
|
{
|
|
core.TrySetResult(value);
|
|
}
|
|
}
|
|
|
|
sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
|
{
|
|
readonly AsyncReactiveProperty<T> parent;
|
|
|
|
public WithoutCurrentEnumerable(AsyncReactiveProperty<T> parent)
|
|
{
|
|
this.parent = parent;
|
|
}
|
|
|
|
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
|
{
|
|
return new Enumerator(parent, cancellationToken, false);
|
|
}
|
|
}
|
|
|
|
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
|
|
{
|
|
static Action<object> cancellationCallback = CancellationCallback;
|
|
|
|
readonly AsyncReactiveProperty<T> parent;
|
|
readonly CancellationToken cancellationToken;
|
|
readonly CancellationTokenRegistration cancellationTokenRegistration;
|
|
T value;
|
|
bool isDisposed;
|
|
bool firstCall;
|
|
|
|
public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
|
|
{
|
|
this.parent = parent;
|
|
this.cancellationToken = cancellationToken;
|
|
this.firstCall = publishCurrentValue;
|
|
|
|
parent.triggerEvent.Add(this);
|
|
TaskTracker.TrackActiveTask(this, 3);
|
|
|
|
if (cancellationToken.CanBeCanceled)
|
|
{
|
|
cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
|
|
}
|
|
}
|
|
|
|
public T Current => value;
|
|
|
|
ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
|
|
ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
|
|
|
|
public UniTask<bool> MoveNextAsync()
|
|
{
|
|
// raise latest value on first call.
|
|
if (firstCall)
|
|
{
|
|
firstCall = false;
|
|
value = parent.Value;
|
|
return CompletedTasks.True;
|
|
}
|
|
|
|
completionSource.Reset();
|
|
return new UniTask<bool>(this, completionSource.Version);
|
|
}
|
|
|
|
public UniTask DisposeAsync()
|
|
{
|
|
if (!isDisposed)
|
|
{
|
|
isDisposed = true;
|
|
TaskTracker.RemoveTracking(this);
|
|
completionSource.TrySetCanceled(cancellationToken);
|
|
parent.triggerEvent.Remove(this);
|
|
}
|
|
return default;
|
|
}
|
|
|
|
public void OnNext(T value)
|
|
{
|
|
this.value = value;
|
|
completionSource.TrySetResult(true);
|
|
}
|
|
|
|
public void OnCanceled(CancellationToken cancellationToken)
|
|
{
|
|
DisposeAsync().Forget();
|
|
}
|
|
|
|
public void OnCompleted()
|
|
{
|
|
completionSource.TrySetResult(false);
|
|
}
|
|
|
|
public void OnError(Exception ex)
|
|
{
|
|
completionSource.TrySetException(ex);
|
|
}
|
|
|
|
static void CancellationCallback(object state)
|
|
{
|
|
var self = (Enumerator)state;
|
|
self.DisposeAsync().Forget();
|
|
}
|
|
}
|
|
}
|
|
|
|
public class ReadOnlyAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
|
|
{
|
|
TriggerEvent<T> triggerEvent;
|
|
|
|
T latestValue;
|
|
IUniTaskAsyncEnumerator<T> enumerator;
|
|
|
|
public T Value
|
|
{
|
|
get
|
|
{
|
|
return latestValue;
|
|
}
|
|
}
|
|
|
|
public ReadOnlyAsyncReactiveProperty(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
|
{
|
|
latestValue = initialValue;
|
|
ConsumeEnumerator(source, cancellationToken).Forget();
|
|
}
|
|
|
|
public ReadOnlyAsyncReactiveProperty(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
|
{
|
|
ConsumeEnumerator(source, cancellationToken).Forget();
|
|
}
|
|
|
|
async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
|
{
|
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
|
try
|
|
{
|
|
while (await enumerator.MoveNextAsync())
|
|
{
|
|
var value = enumerator.Current;
|
|
this.latestValue = value;
|
|
triggerEvent.SetResult(value);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await enumerator.DisposeAsync();
|
|
enumerator = null;
|
|
}
|
|
}
|
|
|
|
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
|
|
{
|
|
return new WithoutCurrentEnumerable(this);
|
|
}
|
|
|
|
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
|
|
{
|
|
return new Enumerator(this, cancellationToken, true);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (enumerator != null)
|
|
{
|
|
enumerator.DisposeAsync().Forget();
|
|
}
|
|
|
|
triggerEvent.SetCompleted();
|
|
}
|
|
|
|
public static implicit operator T(ReadOnlyAsyncReactiveProperty<T> value)
|
|
{
|
|
return value.Value;
|
|
}
|
|
|
|
public override string ToString()
|
|
{
|
|
if (isValueType) return latestValue.ToString();
|
|
return latestValue?.ToString();
|
|
}
|
|
|
|
public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
|
|
}
|
|
|
|
static bool isValueType;
|
|
|
|
static ReadOnlyAsyncReactiveProperty()
|
|
{
|
|
isValueType = typeof(T).IsValueType;
|
|
}
|
|
|
|
sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
|
|
{
|
|
static Action<object> cancellationCallback = CancellationCallback;
|
|
|
|
static TaskPool<WaitAsyncSource> pool;
|
|
WaitAsyncSource nextNode;
|
|
ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
|
|
|
|
static WaitAsyncSource()
|
|
{
|
|
TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
|
|
}
|
|
|
|
ReadOnlyAsyncReactiveProperty<T> parent;
|
|
CancellationToken cancellationToken;
|
|
CancellationTokenRegistration cancellationTokenRegistration;
|
|
UniTaskCompletionSourceCore<T> core;
|
|
|
|
WaitAsyncSource()
|
|
{
|
|
}
|
|
|
|
public static IUniTaskSource<T> Create(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
{
|
|
return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
|
|
}
|
|
|
|
if (!pool.TryPop(out var result))
|
|
{
|
|
result = new WaitAsyncSource();
|
|
}
|
|
|
|
result.parent = parent;
|
|
result.cancellationToken = cancellationToken;
|
|
|
|
if (cancellationToken.CanBeCanceled)
|
|
{
|
|
result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
|
|
}
|
|
|
|
result.parent.triggerEvent.Add(result);
|
|
|
|
TaskTracker.TrackActiveTask(result, 3);
|
|
|
|
token = result.core.Version;
|
|
return result;
|
|
}
|
|
|
|
bool TryReturn()
|
|
{
|
|
TaskTracker.RemoveTracking(this);
|
|
core.Reset();
|
|
cancellationTokenRegistration.Dispose();
|
|
cancellationTokenRegistration = default;
|
|
parent.triggerEvent.Remove(this);
|
|
parent = null;
|
|
cancellationToken = default;
|
|
return pool.TryPush(this);
|
|
}
|
|
|
|
static void CancellationCallback(object state)
|
|
{
|
|
var self = (WaitAsyncSource)state;
|
|
self.OnCanceled(self.cancellationToken);
|
|
}
|
|
|
|
// IUniTaskSource
|
|
|
|
public T GetResult(short token)
|
|
{
|
|
try
|
|
{
|
|
return core.GetResult(token);
|
|
}
|
|
finally
|
|
{
|
|
TryReturn();
|
|
}
|
|
}
|
|
|
|
void IUniTaskSource.GetResult(short token)
|
|
{
|
|
GetResult(token);
|
|
}
|
|
|
|
public void OnCompleted(Action<object> continuation, object state, short token)
|
|
{
|
|
core.OnCompleted(continuation, state, token);
|
|
}
|
|
|
|
public UniTaskStatus GetStatus(short token)
|
|
{
|
|
return core.GetStatus(token);
|
|
}
|
|
|
|
public UniTaskStatus UnsafeGetStatus()
|
|
{
|
|
return core.UnsafeGetStatus();
|
|
}
|
|
|
|
// ITriggerHandler
|
|
|
|
ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
|
|
ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
|
|
|
|
public void OnCanceled(CancellationToken cancellationToken)
|
|
{
|
|
core.TrySetCanceled(cancellationToken);
|
|
}
|
|
|
|
public void OnCompleted()
|
|
{
|
|
// Complete as Cancel.
|
|
core.TrySetCanceled(CancellationToken.None);
|
|
}
|
|
|
|
public void OnError(Exception ex)
|
|
{
|
|
core.TrySetException(ex);
|
|
}
|
|
|
|
public void OnNext(T value)
|
|
{
|
|
core.TrySetResult(value);
|
|
}
|
|
}
|
|
|
|
sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
|
{
|
|
readonly ReadOnlyAsyncReactiveProperty<T> parent;
|
|
|
|
public WithoutCurrentEnumerable(ReadOnlyAsyncReactiveProperty<T> parent)
|
|
{
|
|
this.parent = parent;
|
|
}
|
|
|
|
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
|
{
|
|
return new Enumerator(parent, cancellationToken, false);
|
|
}
|
|
}
|
|
|
|
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
|
|
{
|
|
static Action<object> cancellationCallback = CancellationCallback;
|
|
|
|
readonly ReadOnlyAsyncReactiveProperty<T> parent;
|
|
readonly CancellationToken cancellationToken;
|
|
readonly CancellationTokenRegistration cancellationTokenRegistration;
|
|
T value;
|
|
bool isDisposed;
|
|
bool firstCall;
|
|
|
|
public Enumerator(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
|
|
{
|
|
this.parent = parent;
|
|
this.cancellationToken = cancellationToken;
|
|
this.firstCall = publishCurrentValue;
|
|
|
|
parent.triggerEvent.Add(this);
|
|
TaskTracker.TrackActiveTask(this, 3);
|
|
|
|
if (cancellationToken.CanBeCanceled)
|
|
{
|
|
cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
|
|
}
|
|
}
|
|
|
|
public T Current => value;
|
|
ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
|
|
ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
|
|
|
|
public UniTask<bool> MoveNextAsync()
|
|
{
|
|
// raise latest value on first call.
|
|
if (firstCall)
|
|
{
|
|
firstCall = false;
|
|
value = parent.Value;
|
|
return CompletedTasks.True;
|
|
}
|
|
|
|
completionSource.Reset();
|
|
return new UniTask<bool>(this, completionSource.Version);
|
|
}
|
|
|
|
public UniTask DisposeAsync()
|
|
{
|
|
if (!isDisposed)
|
|
{
|
|
isDisposed = true;
|
|
TaskTracker.RemoveTracking(this);
|
|
completionSource.TrySetCanceled(cancellationToken);
|
|
parent.triggerEvent.Remove(this);
|
|
}
|
|
return default;
|
|
}
|
|
|
|
public void OnNext(T value)
|
|
{
|
|
this.value = value;
|
|
completionSource.TrySetResult(true);
|
|
}
|
|
|
|
public void OnCanceled(CancellationToken cancellationToken)
|
|
{
|
|
DisposeAsync().Forget();
|
|
}
|
|
|
|
public void OnCompleted()
|
|
{
|
|
completionSource.TrySetResult(false);
|
|
}
|
|
|
|
public void OnError(Exception ex)
|
|
{
|
|
completionSource.TrySetException(ex);
|
|
}
|
|
|
|
static void CancellationCallback(object state)
|
|
{
|
|
var self = (Enumerator)state;
|
|
self.DisposeAsync().Forget();
|
|
}
|
|
}
|
|
}
|
|
|
|
public static class StateExtensions
|
|
{
|
|
public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
|
{
|
|
return new ReadOnlyAsyncReactiveProperty<T>(source, cancellationToken);
|
|
}
|
|
|
|
public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
|
|
{
|
|
return new ReadOnlyAsyncReactiveProperty<T>(initialValue, source, cancellationToken);
|
|
}
|
|
}
|
|
} |