Browse Source

1. Add support for ffmpeg.
2. Add proxy support, depends on ffmpeg.

justin.xing 4 years ago
parent
commit
71d64e0da2

+ 1 - 1
DotnetRtmpServer.sln

@@ -5,7 +5,7 @@ VisualStudioVersion = 16.0.31105.61
 MinimumVisualStudioVersion = 10.0.40219.1
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotnetRtmpServer", "DotnetRtmpServer\DotnetRtmpServer.csproj", "{98601E2D-FC97-46AC-9E0D-604E760FD86B}"
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestServer", "TestServer\TestServer.csproj", "{53F9B931-9655-4680-A102-8044C96AB0E2}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestServer", "TestServer\TestServer.csproj", "{53F9B931-9655-4680-A102-8044C96AB0E2}"
 EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution

+ 4 - 0
DotnetRtmpServer/DotnetRtmpServer.csproj

@@ -8,4 +8,8 @@
     <Optimize>false</Optimize>
   </PropertyGroup>
 
+  <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
+    <Optimize>true</Optimize>
+  </PropertyGroup>
+
 </Project>

+ 0 - 9
DotnetRtmpServer/IO/AmfReader.cs

@@ -95,15 +95,6 @@ namespace DotnetRtmpServer.IO
 
         # region helpers
 
-
-        public void Reset()
-        {
-            _amf0ObjectReferences.Clear();
-            _amf3ObjectReferences.Clear();
-            _stringReferences.Clear();
-            _amf3ClassDefinitions.Clear();
-        }
-
         public async Task<int> ReadReverseIntAsync()
         {
             var bytes = await ReadBytesAsync(4).ConfigureAwait(false);

+ 11 - 1
DotnetRtmpServer/Net/Config.cs

@@ -3,7 +3,7 @@ using System.IO;
 
 namespace DotnetRtmpServer.Net
 {
-    class Config
+    public class Config
     {
         /// <summary>
         /// The port which the rtmp server will use.
@@ -66,6 +66,16 @@ namespace DotnetRtmpServer.Net
         /// </summary>
         public static string HlsCacheFolder = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HLS");
 
+        /// <summary>
+        /// If true the proxy will be enabled, which will pull the stream from another server and push to this one.
+        /// </summary>
+        public static bool ProxyMode = false;
+
+        /// <summary>
+        /// The proxy's source host which provide the source stream. 
+        /// </summary>
+        public static string ProxySourceHost = "rtmp://localhost";
+
         /// <summary>
         /// Define the ffmpeg path.
         /// </summary>

+ 9 - 1
DotnetRtmpServer/Net/RtmpHeader.cs

@@ -13,7 +13,15 @@ namespace DotnetRtmpServer.Net
 
         public RtmpHeader Clone()
         {
-            return (RtmpHeader)MemberwiseClone();
+            return new()
+            {
+                PacketLength = PacketLength,
+                StreamId = StreamId,
+                MessageType = MessageType,
+                MessageStreamId = MessageStreamId,
+                Timestamp = Timestamp,
+                IsTimerRelative = IsTimerRelative,
+            };
         }
         
     }

+ 16 - 4
DotnetRtmpServer/Net/RtmpPacketReader.cs

@@ -21,6 +21,8 @@ namespace DotnetRtmpServer.Net
 
         private readonly NetworkStream _stream;
         private readonly AmfReader _reader;
+        private int _latTimeDelta;
+        private int _lastTimestamp;
         private readonly Dictionary<int, RtmpHeader> _rtmpHeaders = new();
         private readonly Dictionary<int, RtmpPacket> _rtmpPackets = new();
 
@@ -110,6 +112,7 @@ namespace DotnetRtmpServer.Net
             }
         }
 
+
         private async Task<RtmpHeader> ReadHeaderAsync()
         {
             // first byte of the chunk basic header
@@ -129,6 +132,7 @@ namespace DotnetRtmpServer.Net
                 previousHeader = header.Clone();
             }
 
+            //按rtmp协议的规定,Type为3的时候表示这个chunk的Message Header和上一个是完全相同的.而且ffmpeg发送的应该为相对时间戳,而服务端处理需要为绝对时间戳
             switch (chunkMessageHeaderType)
             {
                 // 11 bytes
@@ -137,19 +141,26 @@ namespace DotnetRtmpServer.Net
                     header.PacketLength = await _reader.ReadUInt24Async().ConfigureAwait(false);
                     header.MessageType = (MessageType)await _reader.ReadByteAsync().ConfigureAwait(false);
                     header.MessageStreamId = await _reader.ReadReverseIntAsync().ConfigureAwait(false);
+                    _lastTimestamp = header.Timestamp;
+                    _latTimeDelta = 0;
                     break;
 
                 // 7 bytes
                 case ChunkMessageHeaderType.SameSource:
-                    header.Timestamp = await _reader.ReadUInt24Async().ConfigureAwait(false);
+                    _latTimeDelta = await _reader.ReadUInt24Async().ConfigureAwait(false);
+                    _lastTimestamp = _lastTimestamp + _latTimeDelta;
+                    header.Timestamp = _lastTimestamp;
                     header.PacketLength = await _reader.ReadUInt24Async().ConfigureAwait(false);
                     header.MessageType = (MessageType)await _reader.ReadByteAsync().ConfigureAwait(false);
                     header.MessageStreamId = previousHeader.MessageStreamId;
+                    _latTimeDelta = header.Timestamp;
                     break;
 
                 // 3 bytes
                 case ChunkMessageHeaderType.TimestampAdjustment:
-                    header.Timestamp = await _reader.ReadUInt24Async().ConfigureAwait(false);
+                    _latTimeDelta = await _reader.ReadUInt24Async().ConfigureAwait(false);
+                    _lastTimestamp = _lastTimestamp + _latTimeDelta;
+                    header.Timestamp = _lastTimestamp;
                     header.PacketLength = previousHeader.PacketLength;
                     header.MessageType = previousHeader.MessageType;
                     header.MessageStreamId = previousHeader.MessageStreamId;
@@ -157,10 +168,11 @@ namespace DotnetRtmpServer.Net
 
                 // 0 bytes
                 case ChunkMessageHeaderType.Continuation:
-                    header.Timestamp = previousHeader.Timestamp;
+                    header.Timestamp = _lastTimestamp + _latTimeDelta;
                     header.PacketLength = previousHeader.PacketLength;
                     header.MessageType = previousHeader.MessageType;
                     header.MessageStreamId = previousHeader.MessageStreamId;
+                    //Below is for handle the ffmpeg push.
                     header.IsTimerRelative = previousHeader.IsTimerRelative;
                     break;
                 default:
@@ -172,7 +184,7 @@ namespace DotnetRtmpServer.Net
             {
                 header.Timestamp = await _reader.ReadInt32Async().ConfigureAwait(false);
             }
-
+            Logger.WriteLineInfo($"Receive video header {chunkMessageHeaderType} time:{header.Timestamp} packlen:{header.PacketLength} relative:{header.IsTimerRelative}");
             return header;
         }
 

+ 73 - 0
DotnetRtmpServer/Net/RtmpProxy.cs

@@ -0,0 +1,73 @@
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using DotnetRtmpServer.Log;
+
+namespace DotnetRtmpServer.Net
+{
+    internal class RtmpProxy
+    {
+        public static readonly string ProxyId = Guid.NewGuid().ToString("N").ToLower();
+
+        private readonly string _sourceAddress;
+        private readonly string _localAddress;
+
+        private  int _processId;
+        private Process _process;
+
+        public event AsyncEventHandler Closed;
+
+        public string ChannelName { get; }
+
+
+        public RtmpProxy(string proxySourceHost, string app, string stream)
+        {
+            _sourceAddress = proxySourceHost + "/" + app + "/" + stream;
+            ChannelName = $"{app}_{ProxyId}/{stream}";
+            _localAddress = $"rtmp://127.0.0.1:{Config.RtmpPort}/{ChannelName}";
+        }
+
+        public void Start()
+        {
+            var args = $"-i {_sourceAddress} -acodec copy -vcodec copy -f flv -y {_localAddress}";
+            _process = Process.Start(new ProcessStartInfo(Config.FFmpegPath)
+            {
+                Arguments = args,
+                CreateNoWindow = true,
+                WindowStyle = ProcessWindowStyle.Hidden
+            });
+            if (_process == null)
+            {
+                throw new InvalidOperationException("Could not start rtmp proxy.");
+            }
+            _processId = _process.Id;
+            Task.Run(async () =>
+            {
+                await _process.WaitForExitAsync().ConfigureAwait(false);
+                if (Closed != null)
+                {
+                    await Closed(this, EventArgs.Empty).ConfigureAwait(false);
+                }
+            });
+        }
+
+        public void Dispose()
+        {
+            try
+            {
+                _process?.Kill();
+            }
+            catch (Exception ex)
+            {
+                Logger.WriteLineWarn($"Close rtmp proxy failed. error:{ex.Message}");
+            }
+            //Wait for the process exit.
+            while (Process.GetProcesses().Any(x => x.Id == _processId))
+            {
+                Thread.Sleep(1);
+            }
+        }
+    }
+}

+ 123 - 60
DotnetRtmpServer/Net/RtmpServer.cs

@@ -21,6 +21,7 @@ namespace DotnetRtmpServer.Net
         private readonly ConcurrentDictionary<string, IPublisher> _publishers = new();
         private readonly ConcurrentDictionary<string, List<IPlayer>> _players = new();
         private readonly ConcurrentDictionary<ushort, RtmpConnection> _connections = new();
+        private readonly ConcurrentDictionary<string, RtmpProxy> _proxies = new();
 
 
         private readonly Random _random = new();
@@ -50,7 +51,10 @@ namespace DotnetRtmpServer.Net
                 }
             }
             _listener = new TcpListener(IPAddress.Any, Config.RtmpPort);
-            RegisterApp(Config.DefaultApp);
+            if (!Config.ProxyMode)
+            {
+                RegisterApp(Config.DefaultApp);
+            }
         }
 
         public void Start()
@@ -110,49 +114,57 @@ namespace DotnetRtmpServer.Net
 
         public async void HandshakeAsync(Socket socket)
         {
-            var stream = new NetworkStream(socket);
-            var randomBytes = new byte[1528];
-            _random.NextBytes(randomBytes);
-            var s01 = new RtmpHandshake()
+            try
             {
-                Version = 3,
-                Time = (uint)Environment.TickCount,
-                Time2 = 0,
-                Random = randomBytes
-            };
-
-            var c01 = await RtmpHandshake.ReadAsync(stream, true).ConfigureAwait(false);
-            //S0 + S1
-            await RtmpHandshake.WriteAsync(stream, s01, true).ConfigureAwait(false);
-            //S0 + S1 + S2
-            //S2 = S1
-            var s2 = c01.Clone();
-            await RtmpHandshake.WriteAsync(stream, s2, false);
-            //Read C2
-            var c2 = await RtmpHandshake.ReadAsync(stream, false);
-
-            //handshake check
-            if (!c2.Random.SequenceEqual(s01.Random))
-            {
-                throw new ProtocolViolationException();
-            }
+                var stream = new NetworkStream(socket);
+                var randomBytes = new byte[1528];
+                _random.NextBytes(randomBytes);
+                var s1 = new RtmpHandshake()
+                {
+                    Version = 3,
+                    Time = (uint) Environment.TickCount,
+                    Time2 = 0,
+                    Random = randomBytes
+                };
+
+                var c1 = await RtmpHandshake.ReadAsync(stream, true).ConfigureAwait(false);
+                //S0 + S1
+                await RtmpHandshake.WriteAsync(stream, s1, true).ConfigureAwait(false);
+                //S0 + S1 + S2
+                //S2 = S1
+                var s2 = c1.Clone();
+                await RtmpHandshake.WriteAsync(stream, s2, false);
+                //Read C2
+                var c2 = await RtmpHandshake.ReadAsync(stream, false).ConfigureAwait(false);
+
+                //handshake check
+                if (!c2.Random.SequenceEqual(s1.Random))
+                {
+                    throw new ProtocolViolationException();
+                }
 
-            var connection = new RtmpConnection(socket);
-            connection.RequestConnect += OnConnectionRequestConnectAsync;
-            connection.RequestPublish += OnConnectionRequestPublishAsync;
-            connection.Published += OnConnectionPublishedAsync;
-            connection.RequestPlay += OnConnectionRequestPlayAsync;
-            connection.Played += OnConnectionPlayedAsync;
-            connection.Closed += OnConnectionClosedAsync;
-            _connections.AddOrUpdate(connection.Id, connection, (_, exists) =>
-             {
-                 exists.DisposeAsync().AsTask().Wait();
-                 return connection;
-             });
-            if (!Config.EnableLowPowerMode)
+                var connection = new RtmpConnection(socket);
+                connection.RequestConnect += OnConnectionRequestConnectAsync;
+                connection.RequestPublish += OnConnectionRequestPublishAsync;
+                connection.Published += OnConnectionPublishedAsync;
+                connection.RequestPlay += OnConnectionRequestPlayAsync;
+                connection.Played += OnConnectionPlayedAsync;
+                connection.Closed += OnConnectionClosedAsync;
+                _connections.AddOrUpdate(connection.Id, connection, (_, exists) =>
+                {
+                    exists.DisposeAsync().AsTask().Wait();
+                    return connection;
+                });
+                if (!Config.EnableLowPowerMode)
+                {
+                    connection.StartIoProcess();
+                    Logger.WriteLineInfo($"IO processor for connection {connection.Id} started.");
+                }
+            }
+            catch (Exception)
             {
-                connection.StartIoProcess();
-                Logger.WriteLineInfo($"IO processor for connection {connection.Id} started.");
+                Logger.WriteLineInfo($"Handshake error for socket {socket.RemoteEndPoint}.");
+                socket.Close();
             }
         }
 
@@ -182,14 +194,23 @@ namespace DotnetRtmpServer.Net
         #region Connection
         private async Task OnConnectionRequestConnectAsync(object sender, ConnectEventArgs e)
         {
-            lock (_registeredApps)
+            if (!Config.ProxyMode)
             {
-                e.AuthSuccess = _registeredApps.IndexOf(e.App) != -1;
-                if (!e.AuthSuccess)
+                lock (_registeredApps)
                 {
-                    Logger.WriteLineError($"Auth connect failed, connect app is {e.App}, exist apps are [{string.Join(",", _registeredApps)}]");
+                    e.AuthSuccess = _registeredApps.IndexOf(e.App) != -1;
+                    if (!e.AuthSuccess)
+                    {
+                        Logger.WriteLineError(
+                            $"Auth connect failed, connect app is {e.App}, exist apps are [{string.Join(",", _registeredApps)}]");
+                    }
                 }
             }
+            else
+            {
+                e.AuthSuccess = true;
+            }
+
             await Task.CompletedTask.ConfigureAwait(false);
         }
 
@@ -211,16 +232,25 @@ namespace DotnetRtmpServer.Net
 
         private async Task OnConnectionRequestPublishAsync(object sender, PublishEventArgs e)
         {
-            var channelName = e.App + "/" + e.Path;
-            if (_publishers.ContainsKey(channelName))
+            if (Config.ProxyMode && !e.App.Contains(RtmpProxy.ProxyId))
             {
-                Logger.WriteLineWarn($"Publisher for channel:{channelName} already exists.");
+                Logger.WriteLineWarn("Can not publish stream under proxy mode.");
                 e.CanPublish = false;
             }
             else
             {
-                e.CanPublish = true;
+                var channelName = e.App + "/" + e.Path;
+                if (_publishers.ContainsKey(channelName))
+                {
+                    Logger.WriteLineWarn($"Publisher for channel:{channelName} already exists.");
+                    e.CanPublish = false;
+                }
+                else
+                {
+                    e.CanPublish = true;
+                }
             }
+
             await Task.CompletedTask.ConfigureAwait(false);
         }
 
@@ -321,24 +351,46 @@ namespace DotnetRtmpServer.Net
         #region player
         private async Task OnConnectionRequestPlayAsync(object sender, PlayEventArgs e)
         {
-            if (!Config.AllowPlayWithoutPublish)
+            if (Config.ProxyMode)
             {
-                var channelName = e.App + "/" + e.Path;
-                if (_publishers.ContainsKey(channelName))
+                var channelName = $"{e.App}_{RtmpProxy.ProxyId}/{e.Path}";
+                _proxies.AddOrUpdate(channelName, _ =>
                 {
-                    e.CanPlay = true;
+                    var proxy = new RtmpProxy(Config.ProxySourceHost, e.App, e.Path);
+                    proxy.Closed += OnProxyClosedAsync;
+                    proxy.Start();
+                    return proxy;
+                }, (_, exist) => exist);
+                e.CanPlay = true;
+            }
+            else
+            {
+                if (!Config.AllowPlayWithoutPublish)
+                {
+                    var channelName = e.App + "/" + e.Path;
+                    if (_publishers.ContainsKey(channelName))
+                    {
+                        e.CanPlay = true;
+                    }
+                    else
+                    {
+                        Logger.WriteLineWarn($"No publisher for channel:{channelName}.");
+                        e.CanPlay = false;
+                    }
                 }
                 else
                 {
-                    Logger.WriteLineWarn($"No publisher for channel:{channelName}.");
-                    e.CanPlay = false;
+                    e.CanPlay = true;
                 }
             }
-            else
-            {
-                e.CanPlay = true;
-            }
+            await Task.CompletedTask.ConfigureAwait(false);
+        }
 
+        private async Task OnProxyClosedAsync(object sender, EventArgs args)
+        {
+            var proxy = (RtmpProxy) sender;
+            proxy.Closed -= OnProxyClosedAsync;
+            _proxies.TryRemove(proxy.ChannelName, out _);
             await Task.CompletedTask.ConfigureAwait(false);
         }
 
@@ -346,7 +398,7 @@ namespace DotnetRtmpServer.Net
         private async Task OnConnectionPlayedAsync(object sender, PlayEventArgs e)
         {
             var connection = (RtmpConnection)sender;
-            var channelName = e.App + "/" + e.Path;
+            var channelName = (Config.ProxyMode? (e.App + "_" + RtmpProxy.ProxyId) : e.App) + "/" + e.Path;
             var player = new RtmpPlayer(channelName, connection);
             player.Closed += OnPlayerClosedAsync;
 
@@ -403,6 +455,13 @@ namespace DotnetRtmpServer.Net
                     if (players.Count == 0)
                     {
                         _players.TryRemove(player.ChannelName, out _);
+                        if (Config.ProxyMode)
+                        {
+                            if (_proxies.TryGetValue(player.ChannelName, out var proxy))
+                            {
+                                proxy.Dispose();
+                            }
+                        }
                     }
                     else
                     {
@@ -418,6 +477,10 @@ namespace DotnetRtmpServer.Net
 
         public void RegisterApp(string appName)
         {
+            if (Config.ProxyMode)
+            {
+                throw new InvalidOperationException("Can not register app under proxy mode.");
+            }
             lock (_registeredApps)
             {
                 if (_registeredApps.IndexOf(appName) != -1)

+ 2 - 0
TestServer/Program.cs

@@ -7,6 +7,8 @@ namespace TestServer
     {
         static void Main(string[] args)
         {
+            //Config.ProxySourceHost = "rtmp://192.168.6.55";
+            //Config.ProxyMode = true;
             var server = new RtmpServer();
             server.Start();
             Console.Read();

+ 1 - 1
TestServer/Properties/PublishProfiles/FolderProfile.pubxml.user

@@ -4,6 +4,6 @@ https://go.microsoft.com/fwlink/?LinkID=208121.
 -->
 <Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
   <PropertyGroup>
-    <History>True|2021-04-18T06:45:31.9218057Z;False|2021-04-18T14:45:12.7868388+08:00;</History>
+    <History>True|2021-04-20T03:33:25.2575335Z;True|2021-04-20T10:30:11.4811546+08:00;True|2021-04-20T09:26:16.3777791+08:00;True|2021-04-20T09:22:19.0555965+08:00;True|2021-04-20T08:36:58.1397068+08:00;True|2021-04-20T08:09:24.5924411+08:00;True|2021-04-20T07:54:11.3926891+08:00;True|2021-04-19T08:45:21.9449907+08:00;True|2021-04-18T14:45:31.9218057+08:00;False|2021-04-18T14:45:12.7868388+08:00;</History>
   </PropertyGroup>
 </Project>