Эх сурвалжийг харах

Add proxy mode.
Add restart logic for ffmpeg.

justin.xing 4 жил өмнө
parent
commit
7854e39ece

+ 26 - 5
DotnetRtmpServer/Net/Http/HlsProducer.cs

@@ -3,25 +3,38 @@ using System.Diagnostics;
 using System.IO;
 using System.Linq;
 using System.Threading;
+using System.Threading.Tasks;
 using DotnetRtmpServer.Log;
 
 namespace DotnetRtmpServer.Net.Http
 {
-    class HlsProducer:IDisposable
+    internal class HlsProducer:IDisposable
     {
+        private readonly string _streamSource;
         private readonly string _cacheFolder;
-        private readonly int _processId;
-        private readonly Process _process;
 
-        public HlsProducer(string streamSource, string cacheFolder)
+        private int _processId;
+        private Process _process;
+
+        public event AsyncEventHandler Closed;
+
+        public string ChannelName { get; }
+
+        public HlsProducer(string channelName, string streamSource, string cacheFolder)
         {
+            ChannelName = channelName;
+            _streamSource = streamSource;
             _cacheFolder = cacheFolder;
             if (!Directory.Exists(_cacheFolder))
             {
                 Directory.CreateDirectory(_cacheFolder);
             }
+        }
+
+        public void Start()
+        {
             var streamIndex = Path.Combine(_cacheFolder, "play.m3u8");
-            var args = $"-v verbose -i {streamSource} -c:v copy -c:a aac -hls_time 3 -hls_list_size 5 -hls_wrap 5 -start_number 1 {streamIndex}";
+            var args = $"-v verbose -i {_streamSource} -c:v copy -c:a aac -hls_time 3 -hls_list_size 5 -hls_wrap 5 -start_number 1 {streamIndex}";
             _process = Process.Start(new ProcessStartInfo(Config.FFmpegPath)
             {
                 Arguments = args,
@@ -33,6 +46,14 @@ namespace DotnetRtmpServer.Net.Http
                 throw new InvalidOperationException("Could not start HLS transcoder.");
             }
             _processId = _process.Id;
+            Task.Run(async () =>
+            {
+                await _process.WaitForExitAsync().ConfigureAwait(false);
+                if (Closed != null)
+                {
+                    await Closed(this, EventArgs.Empty).ConfigureAwait(false);
+                }
+            });
         }
 
         public void Dispose()

+ 126 - 53
DotnetRtmpServer/Net/Http/HttpServer.cs

@@ -3,6 +3,7 @@ using System.Collections.Concurrent;
 using System.IO;
 using System.Linq;
 using System.Net;
+using System.Runtime.InteropServices;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
@@ -27,6 +28,8 @@ namespace DotnetRtmpServer.Net.Http
 
         private bool _isClosed;
 
+        public event AsyncEventHandler<HttpPlayEventArgs> HttpPlay; 
+
         public HttpServer()
         {
             if (Config.EnableHlsExtension)
@@ -143,57 +146,82 @@ namespace DotnetRtmpServer.Net.Http
             context.Response.ContentType = "application/octet-stream";
             context.Response.StatusCode = (int)HttpStatusCode.OK;
             var channelName = path.Replace(".flv", string.Empty);
-            var player = new HttpFlvPlayer(channelName, context);
-            player.Closed += async(sender, args) =>
+            var appAndStream = channelName.Split('/');
+            if (appAndStream.Length != 2)
             {
-                lock (_players)
+                Logger.WriteLineWarn($"Request publisher for channel {channelName} does not exist.");
+                await Write404Async(context).ConfigureAwait(false);
+            }
+            else
+            {
+                var app = appAndStream[0];
+                var stream = appAndStream[1];
+                if (Config.ProxyMode)
+                {
+                    channelName = $"proxy/{app}/{stream}";
+                }
+                var player = new HttpFlvPlayer(channelName, context);
+                player.Closed += async (sender, args) =>
                 {
-                    if (_players.TryGetValue(player.ChannelName, out var players))
+                    lock (_players)
                     {
-                        players.TryRemove(player.Id, out _);
-                        Logger.WriteLineInfo($"One http-flv player for channel:{player.ChannelName} closed.");
-                        if (players.Count == 0)
+                        if (_players.TryGetValue(player.ChannelName, out var players))
                         {
-                            _players.TryRemove(player.ChannelName, out _);
+                            players.TryRemove(player.Id, out _);
                             Logger.WriteLineInfo($"One http-flv player for channel:{player.ChannelName} closed.");
+                            if (players.Count == 0)
+                            {
+                                _players.TryRemove(player.ChannelName, out _);
+                                Logger.WriteLineInfo($"One http-flv player for channel:{player.ChannelName} closed.");
+                            }
                         }
                     }
-                }
-                await Task.CompletedTask.ConfigureAwait(false);
-            };
-            if (_publishers.TryGetValue(player.ChannelName, out var publisher))
-            {
-                //Send meta data
-                await player.SendMetaDataAsync(publisher.VideoConfigureRecord, publisher.AudioConfigureRecord).ConfigureAwait(false);
-                if (Config.EnablePublishVideoCache)
+                    await Task.CompletedTask.ConfigureAwait(false);
+                };
+
+                //Notify the server create proxy if in proxy mode.
+                if (Config.ProxyMode)
                 {
-                    //Fast play
-                    var videoCaches = publisher.GetVideoCaches();
-                    foreach (var videoCache in videoCaches)
+                    if (HttpPlay != null)
                     {
-                       await player.SendVideoDataAsync(videoCache).ConfigureAwait(false);
+                        await HttpPlay(this, new HttpPlayEventArgs(app, stream)).ConfigureAwait(false);
                     }
                 }
-            }
-            else
-            {
-                Logger.WriteLineWarn($"Request publisher for channel {player.ChannelName} does not exist.");
-                await player.CloseAsync().ConfigureAwait(false);
-            }
 
-            if (!player.IsClosed)
-            {
-                _players.AddOrUpdate(channelName, _ =>
+                if (_publishers.TryGetValue(player.ChannelName, out var publisher))
                 {
-                    var players = new ConcurrentDictionary<ushort, IPlayer>();
-                    players.TryAdd(player.Id, player);
-                    return players;
-                }, (_, players) =>
+                    //Send meta data
+                    await player.SendMetaDataAsync(publisher.VideoConfigureRecord, publisher.AudioConfigureRecord).ConfigureAwait(false);
+                    if (Config.EnablePublishVideoCache)
+                    {
+                        //Fast play
+                        var videoCaches = publisher.GetVideoCaches();
+                        foreach (var videoCache in videoCaches)
+                        {
+                            await player.SendVideoDataAsync(videoCache).ConfigureAwait(false);
+                        }
+                    }
+                }
+                else
+                {
+                    Logger.WriteLineWarn($"Request publisher for channel {player.ChannelName} does not exist.");
+                    await player.CloseAsync().ConfigureAwait(false);
+                }
+
+                if (!player.IsClosed)
                 {
-                    players.TryAdd(player.Id,player);
-                    return players;
-                });
-                Logger.WriteLineInfo($"Http-flv player:{channelName} registered.");
+                    _players.AddOrUpdate(channelName, _ =>
+                    {
+                        var players = new ConcurrentDictionary<ushort, IPlayer>();
+                        players.TryAdd(player.Id, player);
+                        return players;
+                    }, (_, players) =>
+                    {
+                        players.TryAdd(player.Id, player);
+                        return players;
+                    });
+                    Logger.WriteLineInfo($"Http-flv player:{channelName} registered.");
+                }
             }
         }
 
@@ -219,11 +247,25 @@ namespace DotnetRtmpServer.Net.Http
                 {
                     var app = sections[0];
                     var stream = sections[1];
+
+                    //Notify the server create proxy if in proxy mode.
+                    if (Config.ProxyMode)
+                    {
+                        if (HttpPlay != null)
+                        {
+                            await HttpPlay(this, new HttpPlayEventArgs(app, stream)).ConfigureAwait(false);
+                        }
+                    }
+
                     var playFile = sections[2];
-                    var rtmpChannelName = $"{app}/{stream}";
-                    if (_publishers.ContainsKey(rtmpChannelName))
+                    var channelName = $"proxy/{app}/{stream}";
+                    if (_publishers.ContainsKey(channelName))
                     {
-                        var playFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HLS", app, stream, playFile);
+                        var baseDir = Config.ProxyMode
+                            ?
+                            Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HLS", "Proxy")
+                            : Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HLS");
+                        var playFilePath =  Path.Combine(baseDir, app, stream, playFile);
                         if (File.Exists(playFilePath))
                         {
                             var tryTimes = 0;
@@ -250,7 +292,7 @@ namespace DotnetRtmpServer.Net.Http
                                 catch
                                 {
                                     //If the file is being written, wait for 1 second for it end the writing.
-                                    Thread.Sleep(1000);
+                                    await Task.Delay(1000).ConfigureAwait(false);
                                 }
                             }
                         }
@@ -298,28 +340,59 @@ namespace DotnetRtmpServer.Net.Http
             if (Config.EnableHlsExtension)
             {
                 //For HLS
+                var app = string.Empty;
+                var stream = string.Empty;
+                var cacheFolder = string.Empty;
+                var streamSource = string.Empty;
+                var supportHls = false;
                 var appAndStream = publisher.ChannelName.Split('/');
                 if (appAndStream.Length == 2)
                 {
-                    var app = appAndStream[0];
-                    var stream = appAndStream[1];
-                    var cacheFolder = Path.Combine(Config.HlsCacheFolder, app, stream);
-                    var streamSource = $"rtmp://127.0.0.1:{Config.RtmpPort}/{app}/{stream}";
-                    var producer = new HlsProducer(streamSource, cacheFolder);
-                    _hlsProducers.AddOrUpdate(publisher.ChannelName, _ => producer, (_, exist) =>
-                    {
-                        if (exist != producer)
-                        {
-                            exist.Dispose();
-                        }
+                    app = appAndStream[0];
+                    stream = appAndStream[1];
+                    cacheFolder = Path.Combine(Config.HlsCacheFolder, app, stream);
+                    streamSource = $"rtmp://127.0.0.1:{Config.RtmpPort}/{app}/{stream}";
+                    supportHls = true;
+                }
+                else if (appAndStream.Length == 3)
+                {
+                    app = appAndStream[1];
+                    stream = appAndStream[2];
+                    cacheFolder = Path.Combine(Config.HlsCacheFolder, "proxy", app, stream);
+                    streamSource = $"rtmp://127.0.0.1:{Config.RtmpPort}/{app}/{stream}";
+                    supportHls = true;
+                }
 
+                if (supportHls)
+                {
+                    _hlsProducers.AddOrUpdate(publisher.ChannelName, _ =>
+                    {
+                        var producer = new HlsProducer(publisher.ChannelName, streamSource, cacheFolder);
+                        producer.Closed += OnHlsProducerClosedAsync;
+                        producer.Start();
                         return producer;
-                    });
+                    }, (_, exist) => exist);
                     Logger.WriteLineInfo($"HLS producer for channel:{publisher.ChannelName} added.");
                 }
             }
         }
 
+        private async Task OnHlsProducerClosedAsync(object sender, EventArgs args)
+        {
+            var producer = (HlsProducer)sender;
+            if (_publishers.ContainsKey(producer.ChannelName))
+            {
+                //Publisher is still there, it could be the producer crashed, so restart it again.
+                producer.Start();
+            }
+            else
+            {
+                producer.Closed -= OnHlsProducerClosedAsync;
+                _hlsProducers.TryRemove(producer.ChannelName, out _);
+                await Task.CompletedTask.ConfigureAwait(false);
+            }
+        }
+
 
         private async Task OnPublisherAudioDataReceiveAsync(object sender, AudioData e)
         {

+ 2 - 0
DotnetRtmpServer/Net/Http/IHttpExtension.cs

@@ -2,6 +2,8 @@
 {
     interface IHttpExtension
     {
+        event AsyncEventHandler<HttpPlayEventArgs> HttpPlay;
+
         void AddPublisher(IPublisher publisher);
 
         void Close();

+ 5 - 5
DotnetRtmpServer/Net/RtmpConnection.cs

@@ -27,7 +27,7 @@ namespace DotnetRtmpServer.Net
         private static readonly IdGenerator ConnectionIdGenerator = new();
         private static readonly IdGenerator StreamIdGenerator = new();
 
-        private readonly Socket _socket;
+        private readonly TcpClient _client;
 
         private readonly RtmpPacketWriter _writer;
         private readonly RtmpPacketReader _reader;
@@ -67,11 +67,11 @@ namespace DotnetRtmpServer.Net
 
         public bool IsClosed { get; private set; }
 
-        public RtmpConnection(Socket socket)
+        public RtmpConnection(TcpClient client)
         {
             Id = ConnectionIdGenerator.GetUniqueId();
-            _socket = socket;
-            var networkStream = new NetworkStream(_socket);
+            _client = client;
+            var networkStream = _client.GetStream();
             _writer = new RtmpPacketWriter(networkStream);
             _reader = new RtmpPacketReader(networkStream);
             _reader.EventReceived += EventReceivedCallbackAsync;
@@ -524,7 +524,7 @@ namespace DotnetRtmpServer.Net
         {
             if (!_disposed)
             {
-                _socket.Close();
+                _client.Dispose();
                 _reader.EventReceived -= EventReceivedCallbackAsync;
                 _disposed = true;
             }

+ 0 - 1
DotnetRtmpServer/Net/RtmpPacketReader.cs

@@ -184,7 +184,6 @@ namespace DotnetRtmpServer.Net
             {
                 header.Timestamp = await _reader.ReadInt32Async().ConfigureAwait(false);
             }
-            Logger.WriteLineInfo($"Receive video header {chunkMessageHeaderType} time:{header.Timestamp} packlen:{header.PacketLength} relative:{header.IsTimerRelative}");
             return header;
         }
 

+ 2 - 5
DotnetRtmpServer/Net/RtmpProxy.cs

@@ -9,8 +9,6 @@ namespace DotnetRtmpServer.Net
 {
     internal class RtmpProxy
     {
-        public static readonly string ProxyId = Guid.NewGuid().ToString("N").ToLower();
-
         private readonly string _sourceAddress;
         private readonly string _localAddress;
 
@@ -21,17 +19,16 @@ namespace DotnetRtmpServer.Net
 
         public string ChannelName { get; }
 
-
         public RtmpProxy(string proxySourceHost, string app, string stream)
         {
             _sourceAddress = proxySourceHost + "/" + app + "/" + stream;
-            ChannelName = $"{app}_{ProxyId}/{stream}";
+            ChannelName = $"proxy/{app}/{stream}";
             _localAddress = $"rtmp://127.0.0.1:{Config.RtmpPort}/{ChannelName}";
         }
 
         public void Start()
         {
-            var args = $"-i {_sourceAddress} -acodec copy -vcodec copy -f flv -y {_localAddress}";
+            var args = $"-i {_sourceAddress} -acodec copy -vcodec copy -f flv {_localAddress}";
             _process = Process.Start(new ProcessStartInfo(Config.FFmpegPath)
             {
                 Arguments = args,

+ 63 - 31
DotnetRtmpServer/Net/RtmpServer.cs

@@ -39,6 +39,7 @@ namespace DotnetRtmpServer.Net
                 try
                 {
                     _httpExtension = new HttpServer();
+                    _httpExtension.HttpPlay += OnHttpPlayAsync;
                 }
                 catch (Exception ex)
                 {
@@ -67,7 +68,6 @@ namespace DotnetRtmpServer.Net
                 StartIoProcess();
                 Logger.WriteLineInfo("Global IO processor started.");
             }
-
             Logger.WriteLineInfo("RTMP server started.");
         }
 
@@ -80,15 +80,15 @@ namespace DotnetRtmpServer.Net
                 {
                     while (Started)
                     {
-                        var socket = await _listener.AcceptSocketAsync().ConfigureAwait(false);
-                        socket.ReceiveTimeout = ReceiveTimeout;
-                        socket.SendTimeout = SendTimeout;
-                        HandshakeAsync(socket);
+                        var client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
+                        client.ReceiveTimeout = ReceiveTimeout;
+                        client.SendTimeout = SendTimeout;
+                        HandshakeAsync(client);
                     }
                 }
-                catch (Exception e)
+                catch (Exception ex)
                 {
-                    Logger.WriteLineError(e.ToString());
+                    Logger.WriteLineError($"TCP error {ex}");
                 }
             });
         }
@@ -104,19 +104,19 @@ namespace DotnetRtmpServer.Net
                         await Task.WhenAll(_connections.Select(x => x.Value.ProcessOnceAsync()));
                     }
                 }
-                catch (Exception e)
+                catch (Exception ex)
                 {
-                    Logger.WriteLineError(e.ToString());
+                    Logger.WriteLineError($"Process global connection IOs error {ex}");
                 }
             });
         }
 
 
-        public async void HandshakeAsync(Socket socket)
+        public async void HandshakeAsync(TcpClient client)
         {
             try
             {
-                var stream = new NetworkStream(socket);
+                var stream = client.GetStream();
                 var randomBytes = new byte[1528];
                 _random.NextBytes(randomBytes);
                 var s1 = new RtmpHandshake()
@@ -143,7 +143,7 @@ namespace DotnetRtmpServer.Net
                     throw new ProtocolViolationException();
                 }
 
-                var connection = new RtmpConnection(socket);
+                var connection = new RtmpConnection(client);
                 connection.RequestConnect += OnConnectionRequestConnectAsync;
                 connection.RequestPublish += OnConnectionRequestPublishAsync;
                 connection.Published += OnConnectionPublishedAsync;
@@ -163,8 +163,8 @@ namespace DotnetRtmpServer.Net
             }
             catch (Exception)
             {
-                Logger.WriteLineInfo($"Handshake error for socket {socket.RemoteEndPoint}.");
-                socket.Close();
+                Logger.WriteLineInfo($"Handshake error for socket {client.Client.RemoteEndPoint}.");
+                client.Close();
             }
         }
 
@@ -191,6 +191,24 @@ namespace DotnetRtmpServer.Net
         }
 
 
+        private async Task OnHttpPlayAsync(object sender, HttpPlayEventArgs e)
+        {
+            if (Config.ProxyMode)
+            {
+                var channelName = $"proxy/{e.App}/{e.Stream}";
+                _proxies.AddOrUpdate(channelName, _ =>
+                {
+                    var proxy = new RtmpProxy(Config.ProxySourceHost, e.App, e.Stream);
+                    proxy.Closed += OnProxyClosedAsync;
+                    proxy.Start();
+                    return proxy;
+                }, (_, exist) => exist);
+            }
+
+            await Task.CompletedTask.ConfigureAwait(false);
+        }
+
+
         #region Connection
         private async Task OnConnectionRequestConnectAsync(object sender, ConnectEventArgs e)
         {
@@ -201,8 +219,7 @@ namespace DotnetRtmpServer.Net
                     e.AuthSuccess = _registeredApps.IndexOf(e.App) != -1;
                     if (!e.AuthSuccess)
                     {
-                        Logger.WriteLineError(
-                            $"Auth connect failed, connect app is {e.App}, exist apps are [{string.Join(",", _registeredApps)}]");
+                        Logger.WriteLineError( $"Auth connect failed, connect app is {e.App}, exist apps are [{string.Join(",", _registeredApps)}]");
                     }
                 }
             }
@@ -232,10 +249,18 @@ namespace DotnetRtmpServer.Net
 
         private async Task OnConnectionRequestPublishAsync(object sender, PublishEventArgs e)
         {
-            if (Config.ProxyMode && !e.App.Contains(RtmpProxy.ProxyId))
+            if (Config.ProxyMode)
             {
-                Logger.WriteLineWarn("Can not publish stream under proxy mode.");
-                e.CanPublish = false;
+                var subNames = e.App.Split('/');
+                if (subNames.Length != 2 || subNames[0] != "proxy")
+                {
+                    Logger.WriteLineWarn("Can not direct publish stream under proxy mode.");
+                    e.CanPublish = false;
+                }
+                else
+                {
+                    e.CanPublish = true;
+                }
             }
             else
             {
@@ -353,7 +378,7 @@ namespace DotnetRtmpServer.Net
         {
             if (Config.ProxyMode)
             {
-                var channelName = $"{e.App}_{RtmpProxy.ProxyId}/{e.Path}";
+                var channelName = $"proxy/{e.App}/{e.Path}";
                 _proxies.AddOrUpdate(channelName, _ =>
                 {
                     var proxy = new RtmpProxy(Config.ProxySourceHost, e.App, e.Path);
@@ -389,16 +414,30 @@ namespace DotnetRtmpServer.Net
         private async Task OnProxyClosedAsync(object sender, EventArgs args)
         {
             var proxy = (RtmpProxy) sender;
-            proxy.Closed -= OnProxyClosedAsync;
-            _proxies.TryRemove(proxy.ChannelName, out _);
-            await Task.CompletedTask.ConfigureAwait(false);
+            bool canCloseProxy = true;
+            if (_players.TryGetValue(proxy.ChannelName, out var players))
+            {
+                if (players.Count > 0)
+                {
+                    //Still has player to play, restart the proxy.
+                    proxy.Start();
+                    canCloseProxy = false;
+                }
+            }
+
+            if (canCloseProxy)
+            {
+                proxy.Closed -= OnProxyClosedAsync;
+                _proxies.TryRemove(proxy.ChannelName, out _);
+                await Task.CompletedTask.ConfigureAwait(false);
+            }
         }
 
 
         private async Task OnConnectionPlayedAsync(object sender, PlayEventArgs e)
         {
             var connection = (RtmpConnection)sender;
-            var channelName = (Config.ProxyMode? (e.App + "_" + RtmpProxy.ProxyId) : e.App) + "/" + e.Path;
+            var channelName = (Config.ProxyMode? $"proxy/{e.App}" : e.App) + "/" + e.Path;
             var player = new RtmpPlayer(channelName, connection);
             player.Closed += OnPlayerClosedAsync;
 
@@ -455,13 +494,6 @@ namespace DotnetRtmpServer.Net
                     if (players.Count == 0)
                     {
                         _players.TryRemove(player.ChannelName, out _);
-                        if (Config.ProxyMode)
-                        {
-                            if (_proxies.TryGetValue(player.ChannelName, out var proxy))
-                            {
-                                proxy.Dispose();
-                            }
-                        }
                     }
                     else
                     {

+ 1 - 0
TestServer/Program.cs

@@ -9,6 +9,7 @@ namespace TestServer
         {
             //Config.ProxySourceHost = "rtmp://192.168.6.55";
             //Config.ProxyMode = true;
+            //Config.EnableHlsExtension = true;
             var server = new RtmpServer();
             server.Start();
             Console.Read();

+ 0 - 6
TestServer/TestServer.csproj.user

@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="Current" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
-  <PropertyGroup>
-    <_LastSelectedProfileId>D:\New-Projects\DotnetRtmpServer\TestServer\Properties\PublishProfiles\FolderProfile.pubxml</_LastSelectedProfileId>
-  </PropertyGroup>
-</Project>