#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member using System; using System.Runtime.ExceptionServices; using System.Threading; using Cysharp.Threading.Tasks.Internal; namespace Cysharp.Threading.Tasks { public static class UniTaskObservableExtensions { public static UniTask ToUniTask(this IObservable source, bool useFirstValue = false, CancellationToken cancellationToken = default) { var promise = new UniTaskCompletionSource(); var disposable = new SingleAssignmentDisposable(); var observer = useFirstValue ? (IObserver)new FirstValueToUniTaskObserver(promise, disposable, cancellationToken) : (IObserver)new ToUniTaskObserver(promise, disposable, cancellationToken); try { disposable.Disposable = source.Subscribe(observer); } catch (Exception ex) { promise.TrySetException(ex); } return promise.Task; } public static IObservable ToObservable(this UniTask task) { if (task.Status.IsCompleted()) { try { return new ReturnObservable(task.GetAwaiter().GetResult()); } catch (Exception ex) { return new ThrowObservable(ex); } } var subject = new AsyncSubject(); Fire(subject, task).Forget(); return subject; } /// /// Ideally returns IObservabl[Unit] is best but Cysharp.Threading.Tasks does not have Unit so return AsyncUnit instead. /// public static IObservable ToObservable(this UniTask task) { if (task.Status.IsCompleted()) { try { task.GetAwaiter().GetResult(); return new ReturnObservable(AsyncUnit.Default); } catch (Exception ex) { return new ThrowObservable(ex); } } var subject = new AsyncSubject(); Fire(subject, task).Forget(); return subject; } static async UniTaskVoid Fire(AsyncSubject subject, UniTask task) { T value; try { value = await task; } catch (Exception ex) { subject.OnError(ex); return; } subject.OnNext(value); subject.OnCompleted(); } static async UniTaskVoid Fire(AsyncSubject subject, UniTask task) { try { await task; } catch (Exception ex) { subject.OnError(ex); return; } subject.OnNext(AsyncUnit.Default); subject.OnCompleted(); } class ToUniTaskObserver : IObserver { static readonly Action callback = OnCanceled; readonly UniTaskCompletionSource promise; readonly SingleAssignmentDisposable disposable; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration registration; bool hasValue; T latestValue; public ToUniTaskObserver(UniTaskCompletionSource promise, SingleAssignmentDisposable disposable, CancellationToken cancellationToken) { this.promise = promise; this.disposable = disposable; this.cancellationToken = cancellationToken; if (this.cancellationToken.CanBeCanceled) { this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this); } } static void OnCanceled(object state) { var self = (ToUniTaskObserver)state; self.disposable.Dispose(); self.promise.TrySetCanceled(self.cancellationToken); } public void OnNext(T value) { hasValue = true; latestValue = value; } public void OnError(Exception error) { try { promise.TrySetException(error); } finally { registration.Dispose(); disposable.Dispose(); } } public void OnCompleted() { try { if (hasValue) { promise.TrySetResult(latestValue); } else { promise.TrySetException(new InvalidOperationException("Sequence has no elements")); } } finally { registration.Dispose(); disposable.Dispose(); } } } class FirstValueToUniTaskObserver : IObserver { static readonly Action callback = OnCanceled; readonly UniTaskCompletionSource promise; readonly SingleAssignmentDisposable disposable; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration registration; bool hasValue; public FirstValueToUniTaskObserver(UniTaskCompletionSource promise, SingleAssignmentDisposable disposable, CancellationToken cancellationToken) { this.promise = promise; this.disposable = disposable; this.cancellationToken = cancellationToken; if (this.cancellationToken.CanBeCanceled) { this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this); } } static void OnCanceled(object state) { var self = (FirstValueToUniTaskObserver)state; self.disposable.Dispose(); self.promise.TrySetCanceled(self.cancellationToken); } public void OnNext(T value) { hasValue = true; try { promise.TrySetResult(value); } finally { registration.Dispose(); disposable.Dispose(); } } public void OnError(Exception error) { try { promise.TrySetException(error); } finally { registration.Dispose(); disposable.Dispose(); } } public void OnCompleted() { try { if (!hasValue) { promise.TrySetException(new InvalidOperationException("Sequence has no elements")); } } finally { registration.Dispose(); disposable.Dispose(); } } } class ReturnObservable : IObservable { readonly T value; public ReturnObservable(T value) { this.value = value; } public IDisposable Subscribe(IObserver observer) { observer.OnNext(value); observer.OnCompleted(); return EmptyDisposable.Instance; } } class ThrowObservable : IObservable { readonly Exception value; public ThrowObservable(Exception value) { this.value = value; } public IDisposable Subscribe(IObserver observer) { observer.OnError(value); return EmptyDisposable.Instance; } } } } namespace Cysharp.Threading.Tasks.Internal { // Bridges for Rx. internal class EmptyDisposable : IDisposable { public static EmptyDisposable Instance = new EmptyDisposable(); EmptyDisposable() { } public void Dispose() { } } internal sealed class SingleAssignmentDisposable : IDisposable { readonly object gate = new object(); IDisposable current; bool disposed; public bool IsDisposed { get { lock (gate) { return disposed; } } } public IDisposable Disposable { get { return current; } set { var old = default(IDisposable); bool alreadyDisposed; lock (gate) { alreadyDisposed = disposed; old = current; if (!alreadyDisposed) { if (value == null) return; current = value; } } if (alreadyDisposed && value != null) { value.Dispose(); return; } if (old != null) throw new InvalidOperationException("Disposable is already set"); } } public void Dispose() { IDisposable old = null; lock (gate) { if (!disposed) { disposed = true; old = current; current = null; } } if (old != null) old.Dispose(); } } internal sealed class AsyncSubject : IObservable, IObserver { object observerLock = new object(); T lastValue; bool hasValue; bool isStopped; bool isDisposed; Exception lastError; IObserver outObserver = EmptyObserver.Instance; public T Value { get { ThrowIfDisposed(); if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet"); if (lastError != null) ExceptionDispatchInfo.Capture(lastError).Throw(); return lastValue; } } public bool HasObservers { get { return !(outObserver is EmptyObserver) && !isStopped && !isDisposed; } } public bool IsCompleted { get { return isStopped; } } public void OnCompleted() { IObserver old; T v; bool hv; lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; old = outObserver; outObserver = EmptyObserver.Instance; isStopped = true; v = lastValue; hv = hasValue; } if (hv) { old.OnNext(v); old.OnCompleted(); } else { old.OnCompleted(); } } public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); IObserver old; lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; old = outObserver; outObserver = EmptyObserver.Instance; isStopped = true; lastError = error; } old.OnError(error); } public void OnNext(T value) { lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; this.hasValue = true; this.lastValue = value; } } public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); var ex = default(Exception); var v = default(T); var hv = false; lock (observerLock) { ThrowIfDisposed(); if (!isStopped) { var listObserver = outObserver as ListObserver; if (listObserver != null) { outObserver = listObserver.Add(observer); } else { var current = outObserver; if (current is EmptyObserver) { outObserver = observer; } else { outObserver = new ListObserver(new ImmutableList>(new[] { current, observer })); } } return new Subscription(this, observer); } ex = lastError; v = lastValue; hv = hasValue; } if (ex != null) { observer.OnError(ex); } else if (hv) { observer.OnNext(v); observer.OnCompleted(); } else { observer.OnCompleted(); } return EmptyDisposable.Instance; } public void Dispose() { lock (observerLock) { isDisposed = true; outObserver = DisposedObserver.Instance; lastError = null; lastValue = default(T); } } void ThrowIfDisposed() { if (isDisposed) throw new ObjectDisposedException(""); } class Subscription : IDisposable { readonly object gate = new object(); AsyncSubject parent; IObserver unsubscribeTarget; public Subscription(AsyncSubject parent, IObserver unsubscribeTarget) { this.parent = parent; this.unsubscribeTarget = unsubscribeTarget; } public void Dispose() { lock (gate) { if (parent != null) { lock (parent.observerLock) { var listObserver = parent.outObserver as ListObserver; if (listObserver != null) { parent.outObserver = listObserver.Remove(unsubscribeTarget); } else { parent.outObserver = EmptyObserver.Instance; } unsubscribeTarget = null; parent = null; } } } } } } internal class ListObserver : IObserver { private readonly ImmutableList> _observers; public ListObserver(ImmutableList> observers) { _observers = observers; } public void OnCompleted() { var targetObservers = _observers.Data; for (int i = 0; i < targetObservers.Length; i++) { targetObservers[i].OnCompleted(); } } public void OnError(Exception error) { var targetObservers = _observers.Data; for (int i = 0; i < targetObservers.Length; i++) { targetObservers[i].OnError(error); } } public void OnNext(T value) { var targetObservers = _observers.Data; for (int i = 0; i < targetObservers.Length; i++) { targetObservers[i].OnNext(value); } } internal IObserver Add(IObserver observer) { return new ListObserver(_observers.Add(observer)); } internal IObserver Remove(IObserver observer) { var i = Array.IndexOf(_observers.Data, observer); if (i < 0) return this; if (_observers.Data.Length == 2) { return _observers.Data[1 - i]; } else { return new ListObserver(_observers.Remove(observer)); } } } internal class EmptyObserver : IObserver { public static readonly EmptyObserver Instance = new EmptyObserver(); EmptyObserver() { } public void OnCompleted() { } public void OnError(Exception error) { } public void OnNext(T value) { } } internal class ThrowObserver : IObserver { public static readonly ThrowObserver Instance = new ThrowObserver(); ThrowObserver() { } public void OnCompleted() { } public void OnError(Exception error) { ExceptionDispatchInfo.Capture(error).Throw(); } public void OnNext(T value) { } } internal class DisposedObserver : IObserver { public static readonly DisposedObserver Instance = new DisposedObserver(); DisposedObserver() { } public void OnCompleted() { throw new ObjectDisposedException(""); } public void OnError(Exception error) { throw new ObjectDisposedException(""); } public void OnNext(T value) { throw new ObjectDisposedException(""); } } internal class ImmutableList { public static readonly ImmutableList Empty = new ImmutableList(); T[] data; public T[] Data { get { return data; } } ImmutableList() { data = new T[0]; } public ImmutableList(T[] data) { this.data = data; } public ImmutableList Add(T value) { var newData = new T[data.Length + 1]; Array.Copy(data, newData, data.Length); newData[data.Length] = value; return new ImmutableList(newData); } public ImmutableList Remove(T value) { var i = IndexOf(value); if (i < 0) return this; var length = data.Length; if (length == 1) return Empty; var newData = new T[length - 1]; Array.Copy(data, 0, newData, 0, i); Array.Copy(data, i + 1, newData, i, length - i - 1); return new ImmutableList(newData); } public int IndexOf(T value) { for (var i = 0; i < data.Length; ++i) { // ImmutableList only use for IObserver(no worry for boxed) if (object.Equals(data[i], value)) return i; } return -1; } } }