using Discord.Logging;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Audio.Streams
{
/// Wraps another stream with a timed buffer.
public class BufferedWriteStream : AudioOutStream
{
private const int MaxSilenceFrames = 10;
private struct Frame
{
public Frame(byte[] buffer, int bytes)
{
Buffer = buffer;
Bytes = bytes;
}
public readonly byte[] Buffer;
public readonly int Bytes;
}
private static readonly byte[] _silenceFrame = new byte[0];
private readonly AudioClient _client;
private readonly AudioStream _next;
private readonly CancellationTokenSource _disposeTokenSource, _cancelTokenSource;
private readonly CancellationToken _cancelToken;
private readonly Task _task;
private readonly ConcurrentQueue _queuedFrames;
private readonly ConcurrentQueue _bufferPool;
private readonly SemaphoreSlim _queueLock;
private readonly Logger _logger;
private readonly int _ticksPerFrame, _queueLength;
private bool _isPreloaded;
private int _silenceFrames;
public BufferedWriteStream(AudioStream next, IAudioClient client, int bufferMillis, CancellationToken cancelToken, int maxFrameSize = 1500)
: this(next, client as AudioClient, bufferMillis, cancelToken, null, maxFrameSize) { }
internal BufferedWriteStream(AudioStream next, AudioClient client, int bufferMillis, CancellationToken cancelToken, Logger logger, int maxFrameSize = 1500)
{
//maxFrameSize = 1275 was too limiting at 128kbps,2ch,60ms
_next = next;
_client = client;
_ticksPerFrame = OpusEncoder.FrameMillis;
_logger = logger;
_queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up
_disposeTokenSource = new CancellationTokenSource();
_cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_disposeTokenSource.Token, cancelToken);
_cancelToken = _cancelTokenSource.Token;
_queuedFrames = new ConcurrentQueue();
_bufferPool = new ConcurrentQueue();
for (int i = 0; i < _queueLength; i++)
_bufferPool.Enqueue(new byte[maxFrameSize]);
_queueLock = new SemaphoreSlim(_queueLength, _queueLength);
_silenceFrames = MaxSilenceFrames;
_task = Run();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_disposeTokenSource?.Cancel();
_disposeTokenSource?.Dispose();
_cancelTokenSource?.Dispose();
_queueLock?.Dispose();
}
base.Dispose(disposing);
}
private Task Run()
{
return Task.Run(async () =>
{
try
{
while (!_isPreloaded && !_cancelToken.IsCancellationRequested)
await Task.Delay(1).ConfigureAwait(false);
long nextTick = Environment.TickCount;
ushort seq = 0;
uint timestamp = 0;
while (!_cancelToken.IsCancellationRequested)
{
long tick = Environment.TickCount;
long dist = nextTick - tick;
if (dist <= 0)
{
if (_queuedFrames.TryDequeue(out Frame frame))
{
await _client.SetSpeakingAsync(true).ConfigureAwait(false);
_next.WriteHeader(seq, timestamp, false);
await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false);
_bufferPool.Enqueue(frame.Buffer);
_queueLock.Release();
nextTick += _ticksPerFrame;
seq++;
timestamp += OpusEncoder.FrameSamplesPerChannel;
_silenceFrames = 0;
#if DEBUG
var _ = _logger?.DebugAsync($"Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)");
#endif
}
else
{
while ((nextTick - tick) <= 0)
{
if (_silenceFrames++ < MaxSilenceFrames)
{
_next.WriteHeader(seq, timestamp, false);
await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false);
}
else
await _client.SetSpeakingAsync(false).ConfigureAwait(false);
nextTick += _ticksPerFrame;
seq++;
timestamp += OpusEncoder.FrameSamplesPerChannel;
}
#if DEBUG
var _ = _logger?.DebugAsync("Buffer underrun");
#endif
}
}
else
await Task.Delay((int)(dist)/*, _cancelToken*/).ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
});
}
public override void WriteHeader(ushort seq, uint timestamp, bool missed) { } //Ignore, we use our own timing
public override async Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancelToken)
{
CancellationTokenSource writeCancelToken = null;
if (cancelToken.CanBeCanceled)
{
writeCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, _cancelToken);
cancelToken = writeCancelToken.Token;
}
else
cancelToken = _cancelToken;
await _queueLock.WaitAsync(-1, cancelToken).ConfigureAwait(false);
if (!_bufferPool.TryDequeue(out byte[] buffer))
{
#if DEBUG
var _ = _logger?.DebugAsync("Buffer overflow"); //Should never happen because of the queueLock
#endif
#pragma warning disable IDISP016
writeCancelToken?.Dispose();
#pragma warning restore IDISP016
return;
}
Buffer.BlockCopy(data, offset, buffer, 0, count);
_queuedFrames.Enqueue(new Frame(buffer, count));
if (!_isPreloaded && _queuedFrames.Count == _queueLength)
{
#if DEBUG
var _ = _logger?.DebugAsync("Preloaded");
#endif
_isPreloaded = true;
}
writeCancelToken?.Dispose();
}
public override async Task FlushAsync(CancellationToken cancelToken)
{
while (true)
{
cancelToken.ThrowIfCancellationRequested();
if (_queuedFrames.Count == 0)
return;
await Task.Delay(250, cancelToken).ConfigureAwait(false);
}
}
public override Task ClearAsync(CancellationToken cancelToken)
{
do
cancelToken.ThrowIfCancellationRequested();
while (_queuedFrames.TryDequeue(out _));
return Task.Delay(0);
}
}
}