Skip to content

Commit

Permalink
Add count and remove to CompositeDisposable
Browse files Browse the repository at this point in the history
  • Loading branch information
lempiji committed Jun 13, 2020
1 parent e2040b3 commit d997dc9
Showing 1 changed file with 111 additions and 4 deletions.
115 changes: 111 additions & 4 deletions source/rx/disposable.d
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,8 @@ unittest
///
class CompositeDisposable : Disposable
{
private enum ShrinkThreshold = 64;

public:
///
this(Disposable[] disposables...)
Expand All @@ -568,6 +570,12 @@ public:
}

public:
///
size_t count() const nothrow @nogc @property
{
return atomicLoad(_count);
}

///
void dispose()
{
Expand All @@ -579,33 +587,39 @@ public:
_disposed = true;
currentDisposables = _disposables;
_disposables = [];
_count = 0;
}
}

if (currentDisposables)
{
foreach (d; currentDisposables)
{
d.dispose();
if (d)
d.dispose();
}
}
}

///
void clear()
{
Disposable[] currentDisposables;
synchronized (_gate)
{
currentDisposables = _disposables;
_disposables = [];
_count = 0;
}

foreach (d; currentDisposables)
{
d.dispose();
if (d)
d.dispose();
}
}

///
void insert(Disposable item)
{
assert(item !is null);
Expand All @@ -618,6 +632,7 @@ public:
{
_disposables ~= item;
}
atomicOp!"+="(_count, 1);
}

if (shouldDispose)
Expand All @@ -626,20 +641,64 @@ public:
}
}

///
bool remove(Disposable item)
{
assert(item !is null);

synchronized (_gate)
{
auto current = _disposables;

import std.algorithm : countUntil;

auto i = countUntil(current, item);
if (i < 0)
{
// not found, just return
return false;
}

current[i] = null;
const cap = current.capacity;
if (cap > ShrinkThreshold && _count < cap / 2)
{
Disposable[] fresh;
fresh.reserve(cap / 2);

foreach (d; current)
{
if (d !is null)
{
fresh ~= d;
}
}

_disposables = fresh;
}
atomicOp!"-="(_count, 1);
return true;
}
}

private:
Disposable[] _disposables;
bool _disposed;
Object _gate;
Disposable[] _disposables;
shared(bool) _disposed;
shared(size_t) _count;
}

///
unittest
{
auto d1 = new SingleAssignmentDisposable;
auto d2 = new SerialDisposable;
auto d = new CompositeDisposable(d1, d2);
d.dispose();
assert(d.count == 0);
}

///
unittest
{
auto composite = new CompositeDisposable;
Expand All @@ -650,6 +709,7 @@ unittest
assert(disposed);
}

///
unittest
{
auto composite = new CompositeDisposable;
Expand All @@ -663,6 +723,7 @@ unittest
assert(_count == 1);
}

///
unittest
{
auto composite = new CompositeDisposable;
Expand All @@ -674,6 +735,52 @@ unittest
assert(disposed);
}

///
unittest
{
auto composite = new CompositeDisposable;
bool disposed = false;
auto disposable = new AnonymousDisposable({ disposed = true; });
composite.insert(disposable);
composite.remove(disposable);
composite.dispose();
assert(!disposed);
}

///
unittest
{
auto composite = new CompositeDisposable;
Disposable[] ds;
size_t disposedCount = 0;
foreach (_; 0 .. 100)
{
auto temp = new AnonymousDisposable({ disposedCount++; });
composite.insert(temp);
ds ~= temp;
}
foreach (i; 0 .. 80)
{
composite.remove(ds[i]);
}
assert(composite.count == 20);
composite.dispose();
assert(composite.count == 0);
assert(disposedCount == 20);
}

///
unittest
{
auto composite = new CompositeDisposable;
auto disposed = false;
auto item = new AnonymousDisposable({ disposed = true; });
composite.insert(item);
composite.remove(item);
composite.clear();
assert(!disposed);
}

///
class AnonymousDisposable : Disposable
{
Expand Down

0 comments on commit d997dc9

Please sign in to comment.