Commit b53a8246 authored by Emiliano Balbuena's avatar Emiliano Balbuena

(refactor): Move delegate to manager

1 merge request!250(feat): Retry queue for ElasticSearch
Pipeline #69426215 passed with stages
in 8 minutes and 55 seconds
<?php
/**
* DispatchIndexDelegate
* @author edgebal
*/
namespace Minds\Core\Search\Delegates;
use Exception;
use Minds\Common\Urn;
use Minds\Core\Di\Di;
use Minds\Core\Events\EventsDispatcher;
use Minds\Core\Search\Index as SearchIndex;
use Minds\Core\Search\RetryQueue\Repository as RetryQueueRepository;
use Minds\Core\Search\RetryQueue\RetryQueueEntry;
class DispatchIndexDelegate
{
/** @var EventsDispatcher */
protected $eventsDispatcher;
/** @var SearchIndex */
protected $searchIndex;
/** @var RetryQueueRepository */
protected $retryQueueRepository;
/**
* DispatchIndexDelegate constructor.
* @param EventsDispatcher $eventsDispatcher
* @param SearchIndex $searchIndex
* @param RetryQueueRepository $retryQueue
*/
public function __construct(
$eventsDispatcher = null,
$searchIndex = null,
$retryQueue = null
)
{
$this->eventsDispatcher = $eventsDispatcher ?: Di::_()->get('EventsDispatcher');
$this->searchIndex = $searchIndex ?: Di::_()->get('Search\Index');
$this->retryQueueRepository = $retryQueue ?: new RetryQueueRepository();
}
/**
* @param $entity
* @return bool
* @throws Exception
*/
public function index($entity)
{
try {
$indexed = (bool) $this->searchIndex->index($entity);
} catch (Exception $e) {
error_log("[DispatchIndexDelegate] {$e}");
$indexed = false;
}
$urn = (string) (new Urn($entity->guid));
if ($indexed) {
$retryQueueEntry = new RetryQueueEntry();
$retryQueueEntry
->setEntityUrn($urn);
$this->retryQueueRepository->delete($retryQueueEntry);
} else {
$retryQueueEntry = $this->retryQueueRepository->get($urn);
$retries = $retryQueueEntry->getRetries() + 1;
$retryQueueEntry
->setLastRetry(time())
->setRetries($retries);
$retrySaved = $this->retryQueueRepository->add($retryQueueEntry);
if (!$retrySaved) {
error_log("[DispatchIndexDelegate] Critical: Cannot save retry to queue table: {$urn}");
} elseif ($retries < 5) {
error_log("[DispatchIndexDelegate] Warn: Re-queue: {$urn}");
$this->eventsDispatcher->trigger('search:index', 'all', [
'entity' => $entity
]);
}
}
return $indexed;
}
}
......@@ -4,27 +4,13 @@
*/
namespace Minds\Core\Search;
use Exception;
use Minds\Core;
use Minds\Core\Di\Di;
use Minds\Core\Events\Event;
use Minds\Entities;
class Events
{
/** @var Delegates\DispatchIndexDelegate */
protected $dispatchIndexDelegate;
/**
* Events constructor.
* @param Delegates\DispatchIndexDelegate $dispatchIndexDelegate
*/
public function __construct(
$dispatchIndexDelegate = null
)
{
$this->dispatchIndexDelegate = $dispatchIndexDelegate ?: new Delegates\DispatchIndexDelegate();
}
public function register()
{
/** @var Core\Events\Dispatcher $dispatcher */
......@@ -97,7 +83,27 @@ class Events
unserialize($params['entity']) :
$params['entity'];
$this->dispatchIndexDelegate->index($entity);
try {
$wasIndexed = (bool) Di::_()->get('Search\Index')
->index($entity);
} catch (Exception $e) {
error_log("[Search/Events/search:index:dispatch] {$e}");
$wasIndexed = false;
}
// BannedException will return null (which is also falsy)
// So we should retry only on non-null responses from index()
if ($wasIndexed !== null) {
/** @var Core\Search\RetryQueue\Manager $retryQueueManager */
$retryQueueManager = Di::_()->get('Search\RetryQueue\Manager');
if ($wasIndexed) {
$retryQueueManager->prune($entity);
} else {
$retryQueueManager->retry($entity);
}
}
} catch (\Exception $e) {
error_log('[Search/Events/search:index:dispatch] ' . get_class($e) . ': ' . $e->getMessage());
}
......
......@@ -93,7 +93,7 @@ class Index
}
}
} catch (BannedException $e) {
$result = false;
$result = null;
} catch (\Exception $e) {
error_log('[Search/Index] ' . get_class($e) . ": {$e->getMessage()}");
$result = false;
......
<?php
/**
* RetryQueueManager
* @author edgebal
*/
namespace Minds\Core\Search\RetryQueue;
use Exception;
use Minds\Common\Urn;
use Minds\Core\Di\Di;
use Minds\Core\Events\EventsDispatcher;
class Manager
{
/** @var EventsDispatcher */
protected $eventsDispatcher;
/** @var Repository */
protected $repository;
/**
* RetryQueueManager constructor.
* @param EventsDispatcher $eventsDispatcher
* @param Repository $repository
*/
public function __construct(
$eventsDispatcher = null,
$repository = null
)
{
$this->eventsDispatcher = $eventsDispatcher ?: Di::_()->get('EventsDispatcher');
$this->repository = $repository ?: new Repository();
}
/**
* @param mixed $entity
* @return bool
* @throws Exception
*/
public function prune($entity)
{
$urn = (string) (new Urn($entity->guid));
$retryQueueEntry = new RetryQueueEntry();
$retryQueueEntry
->setEntityUrn($urn);
return (bool) $this->repository->delete($retryQueueEntry);
}
/**
* @param mixed $entity
* @return bool
* @throws Exception
*/
public function retry($entity)
{
$urn = (string) (new Urn($entity->guid));
$retryQueueEntry = $this->repository->get($urn);
$retries = $retryQueueEntry->getRetries() + 1;
$retryQueueEntry
->setLastRetry(time())
->setRetries($retries);
$retrySaved = $this->repository->add($retryQueueEntry);
if (!$retrySaved) {
error_log("[RetryQueueManager] Critical: Cannot save retry to queue table: {$urn}");
} elseif ($retries < 5) {
error_log("[RetryQueueManager] Warn: Re-queue: {$urn}");
$this->eventsDispatcher->trigger('search:index', 'all', [
'entity' => $entity
]);
} else {
error_log("[RetryQueueManager] Critical: Too many retries indexing: {$urn}");
}
return $retrySaved;
}
}
......@@ -9,7 +9,6 @@
namespace Minds\Core\Search;
use Minds\Core\Di\Provider;
use Minds\Core\Search\Hashtags\Manager;
class SearchProvider extends Provider
{
......@@ -40,7 +39,11 @@ class SearchProvider extends Provider
}, ['useFactory' => true]);
$this->di->bind('Search\Hashtags\Manager', function ($di) {
return new Manager();
return new Hashtags\Manager();
}, ['useFactory' => true]);
$this->di->bind('Search\RetryQueue\Manager', function ($di) {
return new RetryQueue\Manager();
}, ['useFactory' => true]);
}
}
<?php
namespace Spec\Minds\Core\Search\Delegates;
namespace Spec\Minds\Core\Search\RetryQueue;
use Minds\Core\Events\EventsDispatcher;
use Minds\Core\Search\Delegates\DispatchIndexDelegate;
use Minds\Core\Search\Index as SearchIndex;
use Minds\Core\Search\RetryQueue\Repository as RetryQueueRepository;
use Minds\Core\Search\RetryQueue\Manager;
use Minds\Core\Search\RetryQueue\Repository;
use Minds\Core\Search\RetryQueue\RetryQueueEntry;
use Minds\Entities\Entity;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class DispatchIndexDelegateSpec extends ObjectBehavior
class ManagerSpec extends ObjectBehavior
{
/** @var EventsDispatcher */
protected $eventsDispatcher;
/** @var SearchIndex */
protected $searchIndex;
/** @var RetryQueueRepository */
protected $retryQueueRepository;
/** @var Repository */
protected $repository;
function let(
EventsDispatcher $eventsDispatcher,
SearchIndex $searchIndex,
RetryQueueRepository $retryQueue
Repository $repository
)
{
$this->beConstructedWith($eventsDispatcher, $searchIndex, $retryQueue);
$this->beConstructedWith($eventsDispatcher, $repository);
$this->eventsDispatcher = $eventsDispatcher;
$this->searchIndex = $searchIndex;
$this->retryQueueRepository = $retryQueue;
$this->repository = $repository;
}
function it_is_initializable()
{
$this->shouldHaveType(DispatchIndexDelegate::class);
$this->shouldHaveType(Manager::class);
}
function it_should_index_and_delete(Entity $entity)
function it_should_prune(Entity $entity)
{
$this->searchIndex->index($entity)
->shouldBeCalled()
->willReturn(true);
$entity->get('guid')
->shouldBeCalled()
->willReturn('5000');
$this->retryQueueRepository->delete(Argument::type(RetryQueueEntry::class))
$this->repository->delete(Argument::type(RetryQueueEntry::class))
->shouldBeCalled()
->willReturn(true);
$this
->index($entity)
->prune($entity)
->shouldReturn(true);
}
function it_should_throw_during_index_and_requeue(Entity $entity, RetryQueueEntry $retryQueueEntry)
function it_should_retry(Entity $entity, RetryQueueEntry $retryQueueEntry)
{
$this->searchIndex->index($entity)
->shouldBeCalled()
->willReturn(false);
$entity->get('guid')
->shouldBeCalled()
->willReturn('5000');
$this->retryQueueRepository->get('urn:entity:5000')
$this->repository->get('urn:entity:5000')
->shouldBeCalled()
->willReturn($retryQueueEntry);
......@@ -85,7 +71,7 @@ class DispatchIndexDelegateSpec extends ObjectBehavior
->shouldBeCalled()
->willReturn($retryQueueEntry);
$this->retryQueueRepository->add($retryQueueEntry)
$this->repository->add($retryQueueEntry)
->shouldBeCalled()
->willReturn(true);
......@@ -96,7 +82,41 @@ class DispatchIndexDelegateSpec extends ObjectBehavior
->willReturn(true);
$this
->index($entity)
->shouldReturn(false);
->retry($entity)
->shouldReturn(true);
}
function it_should_not_retry_if_too_many_attempts(Entity $entity, RetryQueueEntry $retryQueueEntry)
{
$entity->get('guid')
->shouldBeCalled()
->willReturn('5000');
$this->repository->get('urn:entity:5000')
->shouldBeCalled()
->willReturn($retryQueueEntry);
$retryQueueEntry->getRetries()
->shouldBeCalled()
->willReturn(4);
$retryQueueEntry->setLastRetry(Argument::type('int'))
->shouldBeCalled()
->willReturn($retryQueueEntry);
$retryQueueEntry->setRetries(5)
->shouldBeCalled()
->willReturn($retryQueueEntry);
$this->repository->add($retryQueueEntry)
->shouldBeCalled()
->willReturn(true);
$this->eventsDispatcher->trigger('search:index', Argument::cetera())
->shouldNotBeCalled();
$this
->retry($entity)
->shouldReturn(true);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment