Commit 0544ae82 authored by Mark Harding's avatar Mark Harding

(feat): transcoder notification - #1187

1 merge request!414WIP: New transcoder
Pipeline #100921703 passed with stages
in 7 minutes and 2 seconds
<?php
namespace Minds\Core\Media\Video\Transcoder\Delegates;
use Minds\Core\Di\Di;
use Minds\Core\Media\Video\Transcoder\Transcode;
use Minds\Core\Media\Video\Transcoder\TranscodeStates;
use Minds\Entities\Video;
use Minds\Core\Events\EventsDispatcher;
use Minds\Core\EntitiesBuilder;
class NotificationDelegate
{
/** @var TranscodeStates */
private $transcodeStates;
/** @var EventsDispatcher */
private $eventsDispatcher;
/** @var EntitiesBuilder */
private $entitiesBuilder;
public function __construct($transcodeStates = null, $eventsDispatcher = null, $entitiesBuilder = null)
{
$this->transcodeStates = $transcodeStates ?? new TranscodeStates();
$this->eventsDispatcher = $eventsDispatcher ?? Di::_()->get('EventsDispatcher');
$this->entitiesBuilder = $entitiesBuilder ?? Di::_()->get('EntitiesBuilder');
}
/**
* Add a transcode to the queue
* @param Transcode $transcode
* @return void
*/
public function onTranscodeCompleted(Transcode $transcode): void
{
$video = $this->entitiesBuilder->single($transcode->getGuid());
if (!$video || !$video instanceof Video) {
error_log("Video ({$transcode->getGuid()}not found");
return; // TODO: Tell sentry?
}
$status = $this->transcodeStates->getStatus($video);
if ($status === TranscodeStates::COMPLETED) {
$this->emitCompletedNotification($video);
} elseif ($status === TranscodeStates::FAILED) {
$this->emitFailedNotification($video);
}
}
/**
* @var Video $video
* @return void
*/
private function emitCompletedNotification(Video $video): void
{
$this->eventsDispatcher->trigger('notification', 'transcoder', [
'to'=> [ $video->getOwnerGuid() ],
'from' => 100000000000000519,
'notification_view' => 'transcode_completed',
'entity' => $video,
]);
}
/**
* @var Video $video
* @return void
*/
private function emitFailedNotification(Video $video): void
{
$this->eventsDispatcher->trigger('notification', 'transcoder', [
'to'=> [ $video->getOwnerGuid() ],
'from' => 100000000000000519,
'notification_view' => 'transcode_failed',
'entity' => $video,
]);
}
}
......@@ -5,6 +5,7 @@
namespace Minds\Core\Media\Video\Transcoder;
use Minds\Core\Media\Video\Transcoder\Delegates\QueueDelegate;
use Minds\Core\Media\Video\Transcoder\Delegates\NotificationDelegate;
use Minds\Entities\Video;
use Minds\Traits\MagicAttributes;
......@@ -21,6 +22,9 @@ class Manager
TranscodeProfiles\Webm_1080p::class,
];
/** @var int */
const TRANSCODER_TIMEOUT_SECS = 600; // 10 minutes with not progress
/** @var Repository */
private $repository;
......@@ -33,12 +37,16 @@ class Manager
/** @var TranscodeExecutors\TranscodeExecutorInterfsce */
private $transcodeExecutor;
public function __construct($repository = null, $queueDelegate = null, $transcodeStorage = null, $transcodeExecutor = null)
/** @var NotificationDelegate */
private $notificationDelegate;
public function __construct($repository = null, $queueDelegate = null, $transcodeStorage = null, $transcodeExecutor = null, $notificationDelegate = null)
{
$this->repository = $repository ?? new Repository();
$this->queueDelegate = $queueDelegate ?? new QueueDelegate();
$this->transcodeStorage = $transcodeStorage ?? new TranscodeStorage\S3Storage();
$this->transcodeExecutor = $transcodeExecutor ?? new TranscodeExecutors\FFMpegExecutor();
$this->notificationDelegate = $notificationDelegate ?? new NotificationDelegate();
}
/**
......@@ -110,7 +118,8 @@ class Manager
$transcode = new Transcode();
$transcode
->setVideo($video)
->setProfile(new $profile);
->setProfile(new $profile)
->setStatus(TranscodeStates::CREATED);
// Add the transcode to database and queue
$this->add($transcode);
} catch (TranscodeProfiles\UnavailableTranscodeProfileException $e) {
......@@ -173,14 +182,6 @@ class Manager
$this->update($transcode, [ 'progress', 'status' ]);
}
// Was this the last transcode to complete?
// if ($this->isLastToTrancode($transcode)) {
// // Sent a notification to the user saying the transcode is completed
// }
$this->notificationDelegate->onTranscodeCompleted($transcode);
}
// protected function isLastToTrancode(Transcode $transcode): bool
// {
// }
}
......@@ -38,7 +38,14 @@ class Repository
'status' => null,
], $opts);
$statement = "SELECT * FROM video_transcodes";
$statement = "SELECT
guid,
profile_id,
progress,
status,
last_event_timestamp_ms,
bytes
FROM video_transcodes";
$where = [];
$values = [];
......@@ -88,7 +95,14 @@ class Repository
$urn = Urn::_($urn);
list($guid, $profile) = explode('-', $urn->getNss());
$statement = "SELECT * FROM video_transcodes
$statement = "SELECT
guid,
profile_id,
progress,
status,
last_event_timestamp_ms,
bytes
FROM video_transcodes
WHERE guid = ?
AND profile = ?";
$values = [
......@@ -119,10 +133,11 @@ class Repository
*/
public function add(Transcode $transcode): bool
{
$statement = "INSERT INTO video_transcodes (guid, profile_id) VALUES (?, ?)";
$statement = "INSERT INTO video_transcodes (guid, profile_id, status) VALUES (?, ?)";
$values = [
new Bigint($transcode->getGuid()),
$transcode->getProfile()->getId(),
$transcode->getStatus(),
];
$prepared = new Custom();
......@@ -235,11 +250,11 @@ class Repository
$transcode = new Transcode();
$transcode->setGuid((string) $row['guid'])
->setProfile(TranscodeProfiles\Factory::build((string) $row['profile_id']))
->setProgress($row['progress']->value())
->setProgress($row['progress'])
->setStatus($row['status'])
->setLastEventTimestampMs(round($row['last_event_timestamp_ms']->microtime(true) * 1000))
->setLength($row['length_secs']->value())
->setBytes($row['bytes']->value());
->setLengthSecs($row['length_secs'])
->setBytes($row['bytes']);
return $transcode;
}
}
<?php
namespace Minds\Core\Media\Video\Transcoder;
use Minds\Entities\Video;
class TranscodeStates
{
/** @var string */
public const CREATED = 'created';
/** @var string */
public const TRANSCODING = 'transcoding';
/** @var string */
public const FAILED = 'failed';
/** @var string */
public const COMPLETED = 'completed';
/** @var Repository */
private $repository;
public function __construct($repository = null)
{
// NOTE: We are using repository as this is called via
// Delegates\NotificationDelegate and it causes an infinite loop
// with the manager
$this->repository = $repository ?? new Repository();
}
/**
* Return the overral transcoding status
* MH: I don't love this function at all!
* @param Video $video
* @return string
*/
public function getStatus(Video $video): string
{
$transcodes = $this->repository->getList([
'guid' => $video->getGuid(),
]);
$failures = 0;
$completed = 0;
foreach ($transcodes as $transcode) {
if ($transcode instanceof TranscodeProfiles\Thumbnails) {
continue; // We skip thumbnails as these are likely to succeed
}
switch ($transcode->getStatus()) {
case TranscodeStates::TRANSCODING:
if ($transcode->getLastEventTimestampMs() >= (time() - Manager::TRANSCODER_TIMEOUT_SECS) * 1000) {
// Still transcoding
return TranscodeStates::TRANSCODING;
} else {
++$failures;
}
break;
case TranscodeStates::CREATED:
// If not started to transcode then we are in a created state
return TranscodeStates::CREATED;
break;
case TranscodeStates::FAILED:
++$failures;
// We should allow failures for some transcodes
break;
case TranscodeStates::COMPLETED:
++$completed;
// We should allow failures for some transcodes
break;
}
}
// If we have more completions then failures the declare completed
if ($failures < $completed) {
return TranscodeStates::COMPLETED;
}
return TranscodeStates::FAILED; // Our default state is failed?
}
}
<?php
namespace Spec\Minds\Core\Media\Video\Transcoder\Delegates;
use Minds\Core\Media\Video\Transcoder\Delegates\NotificationDelegate;
use Minds\Core\Media\Video\Transcoder\TranscodeStates;
use Minds\Core\Media\Video\Transcoder\Transcode;
use Minds\Core\Events\EventsDispatcher;
use Minds\Core\EntitiesBuilder;
use Minds\Entities\Video;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class NotificationDelegateSpec extends ObjectBehavior
{
private $transcodeStates;
private $eventsDispatcher;
private $entitiesBuilder;
public function let(TranscodeStates $transcodeStates, EventsDispatcher $eventsDispatcher, EntitiesBuilder $entitiesBuilder)
{
$this->beConstructedWith($transcodeStates, $eventsDispatcher, $entitiesBuilder);
$this->transcodeStates = $transcodeStates;
$this->eventsDispatcher = $eventsDispatcher;
$this->entitiesBuilder = $entitiesBuilder;
}
private function mockFetchVideo()
{
$this->entitiesBuilder->single('123')
->shouldBeCalled()
->willReturn(
(new Video)
->set('guid', '123')
->set('owner_guid', '456')
);
}
public function it_is_initializable()
{
$this->shouldHaveType(NotificationDelegate::class);
}
public function it_should_send_notification_of_completed()
{
$transcode = new Transcode();
$transcode->setGuid('123');
$this->mockFetchVideo();
$this->transcodeStates->getStatus(Argument::that(function ($video) {
return $video->getGuid() === '123';
}))
->shouldBeCalled()
->willReturn('completed');
$this->eventsDispatcher->trigger('notification', 'transcoder', Argument::type('array'))
->shouldBeCalled();
$this->onTranscodeCompleted($transcode);
}
public function it_should_send_notification_of_failed()
{
$transcode = new Transcode();
$transcode->setGuid('123');
$this->mockFetchVideo();
$this->transcodeStates->getStatus(Argument::that(function ($video) {
return $video->getGuid() === '123';
}))
->shouldBeCalled()
->willReturn('failed');
$this->eventsDispatcher->trigger('notification', 'transcoder', Argument::type('array'))
->shouldBeCalled();
$this->onTranscodeCompleted($transcode);
}
public function it_should_do_nothing()
{
$transcode = new Transcode();
$transcode->setGuid('123');
$this->mockFetchVideo();
$this->transcodeStates->getStatus(Argument::that(function ($video) {
return $video->getGuid() === '123';
}))
->shouldBeCalled()
->willReturn('transcoding');
$this->eventsDispatcher->trigger('notification', 'transcoder', Argument::type('array'))
->shouldNotBeCalled();
$this->onTranscodeCompleted($transcode);
}
}
......@@ -9,6 +9,7 @@ use Minds\Core\Media\Video\Transcoder\TranscodeProfiles;
use Minds\Core\Media\Video\Transcoder\TranscodeStorage\TranscodeStorageInterface;
use Minds\Core\Media\Video\Transcoder\TranscodeExecutors\TranscodeExecutorInterface;
use Minds\Core\Media\Video\Transcoder\Transcode;
use Minds\Core\Media\Video\Transcoder\Delegates\NotificationDelegate;
use Minds\Entities\Video;
use Minds\Entities\User;
use PhpSpec\ObjectBehavior;
......@@ -20,14 +21,16 @@ class ManagerSpec extends ObjectBehavior
private $queueDelegate;
private $transcodeStorage;
private $transcodeExecutor;
private $notificationDelegate;
public function let(Repository $repository, QueueDelegate $queueDelegate, TranscodeStorageInterface $transcodeStorage, TranscodeExecutorInterface $transcodeExecutor)
public function let(Repository $repository, QueueDelegate $queueDelegate, TranscodeStorageInterface $transcodeStorage, TranscodeExecutorInterface $transcodeExecutor, NotificationDelegate $notificationDelegate)
{
$this->beConstructedWith($repository, $queueDelegate, $transcodeStorage, $transcodeExecutor);
$this->beConstructedWith($repository, $queueDelegate, $transcodeStorage, $transcodeExecutor, $notificationDelegate);
$this->repository = $repository;
$this->queueDelegate = $queueDelegate;
$this->transcodeStorage = $transcodeStorage;
$this->transcodeExecutor = $transcodeExecutor;
$this->notificationDelegate = $notificationDelegate;
}
public function it_is_initializable()
......@@ -121,6 +124,7 @@ class ManagerSpec extends ObjectBehavior
public function it_should_execute_the_transcode()
{
$transcode = new Transcode();
$transcode->setGuid('123');
$this->repository->update(Argument::that(function ($transcode) {
return $transcode->getStatus() === 'transcoding';
......@@ -136,6 +140,10 @@ class ManagerSpec extends ObjectBehavior
}), [ 'progress', 'status' ])
->shouldBeCalled();
// Check for future transcodes is called
$this->notificationDelegate->onTranscodeCompleted($transcode)
->shouldBeCalled();
$this->transcode($transcode);
}
}
......@@ -38,6 +38,10 @@ class RepositorySpec extends ObjectBehavior
->shouldBeCalled()
->willReturn(new TranscodeProfiles\X264_360p());
$transcode->getStatus()
->shouldBeCalled()
->willReturn('created');
$this->db->request(Argument::that(function ($prepared) {
return true;
}))
......
<?php
namespace Spec\Minds\Core\Media\Video\Transcoder;
use Minds\Core\Media\Video\Transcoder\TranscodeStates;
use Minds\Core\Media\Video\Transcoder\Repository;
use Minds\Core\Media\Video\Transcoder\Transcode;
use Minds\Core\Media\Video\Transcoder\TranscodeProfiles;
use Minds\Entities\Video;
use Minds\Common\Repository\Response;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class TranscodeStatesSpec extends ObjectBehavior
{
private $repository;
public function let(Repository $repository)
{
$this->beConstructedWith($repository);
$this->repository = $repository;
}
public function it_is_initializable()
{
$this->shouldHaveType(TranscodeStates::class);
}
public function it_should_return_transcoding_status()
{
$video = new Video();
$video->set('guid', '123');
$this->repository->getList([ 'guid' => '123' ])
->shouldBeCalled()
->willReturn(new Response([
(new Transcode())
->setProfile(new TranscodeProfiles\X264_360p())
->setStatus('transcoding')
->setLastEventTimestampMs(round(microtime(true) * 1000)),
(new Transcode())
->setProfile(new TranscodeProfiles\X264_720p())
->setStatus('completed')
->setLastEventTimestampMs(round(microtime(true) * 1000))
]));
$this->getStatus($video)
->shouldReturn('transcoding');
}
public function it_should_declare_a_timeout()
{
$video = new Video();
$video->set('guid', '123');
$this->repository->getList([ 'guid' => '123' ])
->shouldBeCalled()
->willReturn(new Response([
(new Transcode())
->setProfile(new TranscodeProfiles\X264_360p())
->setStatus('transcoding')
->setLastEventTimestampMs(round(microtime(true) * 1000) - 700000),
(new Transcode())
->setProfile(new TranscodeProfiles\X264_720p())
->setStatus('transcoding')
->setLastEventTimestampMs(round(microtime(true) * 1000) - 700000)
]));
$this->getStatus($video)
->shouldReturn('failed');
}
public function it_should_return_completed_status()
{
$video = new Video();
$video->set('guid', '123');
$this->repository->getList([ 'guid' => '123' ])
->shouldBeCalled()
->willReturn(new Response([
(new Transcode())
->setProfile(new TranscodeProfiles\X264_360p())
->setStatus('completed')
->setLastEventTimestampMs(round(microtime(true) * 1000)),
(new Transcode())
->setProfile(new TranscodeProfiles\X264_720p())
->setStatus('completed')
->setLastEventTimestampMs(round(microtime(true) * 1000))
]));
$this->getStatus($video)
->shouldReturn('completed');
}
public function it_should_return_failed_status()
{
$video = new Video();
$video->set('guid', '123');
$this->repository->getList([ 'guid' => '123' ])
->shouldBeCalled()
->willReturn(new Response([
(new Transcode())
->setProfile(new TranscodeProfiles\X264_360p())
->setStatus('failed')
->setLastEventTimestampMs(round(microtime(true) * 1000)),
(new Transcode())
->setProfile(new TranscodeProfiles\X264_720p())
->setStatus('failed')
->setLastEventTimestampMs(round(microtime(true) * 1000))
]));
$this->getStatus($video)
->shouldReturn('failed');
}
}
Please register or to comment