...
 
Commits (2)
......@@ -10,14 +10,12 @@ use Minds\Interfaces;
class Analytics extends Cli\Controller implements Interfaces\CliControllerInterface
{
private $start;
private $elasticsearch;
public function help($command = null)
{
switch ($command) {
case 'sync_activeUsers':
$this->out('Indexes user activity by guid and counts per day');
$this->out('--incremental sync current day for estimates of state change (overrides from and to params)');
$this->out('--from={timestamp} the day to start counting. Default is yesterday at midnight');
$this->out('--to={timestamp} the day to stop counting. Default is yesterday at midnight');
$this->out('--rangeOffset={number of days} the number of days to look back into the past. Default is 7');
......@@ -46,9 +44,17 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
{
error_reporting(E_ALL);
ini_set('display_errors', 1);
$estimate = false;
if ($this->getOpt('incremental')) {
$estimate = true;
$from = strtotime('midnight +1 day'); //run throughout the day, provides estimates
$to = $from;
} else {
$from = (strtotime('midnight', $this->getOpt('from')) ?: strtotime('midnight yesterday'));
$to = (strtotime('midnight', $this->getOpt('to')) ?: strtotime('midnight yesterday'));
}
$from = (strtotime('midnight', $this->getOpt('from')) ?: strtotime('midnight yesterday'));
$to = (strtotime('midnight', $this->getOpt('to')) ?: strtotime('midnight yesterday'));
$rangeOffset = getopt('rangeOffset') ?: 7;
$mode = strtolower($this->getOpt('mode')) ?: 'notify';
$this->out('Collecting user activity');
......@@ -61,7 +67,7 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
->sync();
if ($mode === 'notify') {
$this->out('Sending notifications');
$manager->emitStateChanges();
$manager->emitStateChanges($estimate);
}
$from = strtotime('+1 day', $from);
}
......
<?php
namespace Minds\Core\Analytics\Delegates;
use Minds\Core\Analytics\UserStates\UserState;
use Minds\Core\Events\Dispatcher;
use Minds\Core\Notification\Notification;
use Minds\Entities\User;
class UpdateUserStateEstimate
{
/** @var User */
private $user;
/** @var UserState */
private $userState;
/** @var int */
private $estimateStateChange;
public function __construct(UserState $userState, User $user = null)
{
$this->userState = $userState;
$this->user = $user ?? new User($userState->getUserGuid());
$this->estimateStateChange = 0;
}
public function update(): void
{
$this->deriveEstimateChange();
$this->updateUserEntity();
$this->sendStateChangeNotification();
}
private function deriveEstimateChange(): void
{
$this->estimateStateChange = UserState::stateChange($this->user->getUserStateToday(), $this->userState->getState());
}
private function updateUserEntity(): void
{
$this->user->setUserStateToday($this->userState->getState())
->setUserStateTodayUpdatedMs($this->userState->getReferenceDateMs())
->save();
}
private function sendStateChangeNotification(): void
{
$data = [
$this->user->getGUID(),
$this->user->getUserState(),
$this->user->getUserStateToday(),
$this->userState->getStateChange(),
$this->estimateStateChange
];
error_log(implode('|', $data));
if ($this->estimateStateChange < 0 && $this->userState->getStateChange() < 0) {
$notificationView = 'rewards_state_decrease_today';
Dispatcher::trigger('notification', 'reward', [
'to' => [
$this->userState->getUserGuid()
],
'from' => Notification::SYSTEM_ENTITY,
'notification_view' => $notificationView,
'params' => $this->userState->export()
]);
}
}
}
......@@ -21,5 +21,10 @@ class Events
$userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters());
(new Core\Analytics\Delegates\UpdateUserState($userState))->update();
});
$this->eventsDispatcher->register('user_state_change_estimate', 'all', function (Core\Events\Event $event) {
$userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters());
(new Core\Analytics\Delegates\UpdateUserStateEstimate($userState))->update();
});
}
}
......@@ -14,134 +14,58 @@ use Minds\Core\Data;
class ActiveUsersIterator implements \Iterator
{
private $cursor = -1;
private $period = 0;
private $item;
private $limit = 400;
private $partitions = 200;
private $page = -1;
/** @var UserState[] $data */
private $data = [];
private $active;
private $valid = true;
public function __construct($client = null)
/** @var UserActivityBuckets[] $data */
protected $data = [];
protected $valid = true;
protected $referenceDate;
protected $rangeOffset = 7;
/** @var Data\ElasticSearch\Client */
protected $client;
/** @var ActiveUsersQueryBuilder */
protected $queryBuilder;
protected $cursor = -1;
protected $partitions = 200;
protected $page = -1;
public function __construct(Data\ElasticSearch\Client $client = null, ActiveUsersQueryBuilder $queryBuilder = null)
{
$this->client = $client ?: Di::_()->get('Database\ElasticSearch');
$this->position = 0;
$this->queryBuilder = $queryBuilder ?? new ActiveUsersQueryBuilder();
$this->queryBuilder->setPartitions($this->partitions);
$this->referenceDate = strtotime('midnight');
$this->rangeOffset = 7;
}
//Sets the last day for the iterator (ie, today)
public function setReferenceDate($referenceDate)
/**
* Sets the last day for the iterator (ie, today)
* @param int $referenceDate
* @return ActiveUsersIterator
*/
public function setReferenceDate(int $referenceDate): self
{
$this->referenceDate = $referenceDate;
return $this;
}
//Sets the number of days to look backwards
public function setRangeOffset($rangeOffset)
/**
* Sets the number of days to look backwards
* @param int $rangeOffset
* @return ActiveUsersIterator
*/
public function setRangeOffset(int $rangeOffset): self
{
$this->rangeOffset = $rangeOffset;
return $this;
}
//Builds up a sub aggregate that counts the days for a bucket with the same name
private function buildBucketCountAggregation($name)
{
return [
'sum_bucket' => [
'buckets_path' => "$name-bucket>_count",
],
];
}
//Builds up a sub aggregate that splits a user's activity into days
private function buildBucketAggregation($name, $dayOffset)
{
$toOffset = $dayOffset - 1;
//Set times to midnight of the current day until midnight of the next day(end of day);
$from = strtotime("-$dayOffset day", $this->referenceDate);
$to = strtotime("-$toOffset day", $this->referenceDate);
return [
'date_range' => [
'field' => '@timestamp',
'ranges' => [
[
'from' => $from * 1000, //eg 2019-01-24 00:00:00
'to' => $to * 1000, //eg 2019-01-25 00:00:00
],
],
],
];
}
public function get()
public function get(): bool
{
if ($this->page++ >= $this->partitions - 1) {
$this->valid = false;
return;
return false;
}
//Set the range for the entire query day - offset to day + 1
$from = strtotime("-$this->rangeOffset day", $this->referenceDate);
$to = strtotime('+1 day', $this->referenceDate);
$bucketAggregations = [];
//for the range of (reference day ) - offset (midnight) to (reference day) + 1 offset (midnight the next day)
foreach (range(0, $this->rangeOffset) as $dayOffset) {
$bucketAggregations["day-$dayOffset-bucket"] = $this->buildBucketAggregation("day-$dayOffset", $dayOffset);
$bucketAggregations["day-$dayOffset"] = $this->buildBucketCountAggregation("day-$dayOffset");
}
$must = [
['match_phrase' => [
'action.keyword' => [
'query' => 'active',
],
]],
['range' => [
'@timestamp' => [
'from' => $from * 1000, //midnight of the first day
'to' => $to * 1000, //midnight of the last day
'format' => 'epoch_millis',
],
]],
];
//split up users by user guid
$aggs = [
'users' => [
'terms' => [
'field' => 'user_guid.keyword',
'size' => 5000,
'include' => [
'partition' => $this->page,
'num_partitions' => $this->partitions,
],
],
'aggs' => $bucketAggregations,
],
];
$query = [
'index' => 'minds-metrics-*',
'size' => '0',
'body' => [
'query' => [
'bool' => [
'must' => $must,
],
],
'aggs' => $aggs,
],
];
$query = $this->queryBuilder->setFrom($from)->setTo($this->referenceDate)->setPage($this->page)->query();
$prepared = new Core\Data\ElasticSearch\Prepared\Search();
$prepared->query($query);
......@@ -150,7 +74,6 @@ class ActiveUsersIterator implements \Iterator
$result = $this->client->request($prepared);
} catch (\Exception $e) {
error_log($e);
return false;
}
......@@ -158,17 +81,17 @@ class ActiveUsersIterator implements \Iterator
return false;
}
//Cook down the verbose elastic search into just the data we need
/* Derive activity data from the ES results */
foreach ($result['aggregations']['users']['buckets'] as $userActivityByDay) {
$userActivityBuckets = (new UserActivityBuckets())
->setUserGuid($userActivityByDay['key'])
->setReferenceDateMs($this->referenceDate * 1000);
$days = [];
foreach (range(0, $this->rangeOffset) as $dayOffset) {
$days[$dayOffset] = [
'reference_date' => $userActivityByDay["day-$dayOffset-bucket"]['buckets'][0]['from'],
'count' => $userActivityByDay["day-$dayOffset"]['value'],
foreach ($this->queryBuilder->buckets() as $bucketTime) {
$days[] = [
'reference_date' => $userActivityByDay[$bucketTime]['buckets'][0]['from'],
'count' => $userActivityByDay["count-$bucketTime"]['value'],
];
}
......@@ -178,6 +101,8 @@ class ActiveUsersIterator implements \Iterator
if ($this->cursor >= count($this->data)) {
$this->get();
}
return true;
}
/**
......@@ -194,7 +119,7 @@ class ActiveUsersIterator implements \Iterator
/**
* Get the current cursor's data.
*
* @return mixed
* @return UserActivityBuckets
*/
public function current()
{
......
<?php
namespace Minds\Core\Analytics\UserStates;
use Minds\Core\Time;
class ActiveUsersQueryBuilder
{
protected $index = 'minds-metrics-*';
protected $from = 0;
protected $to = 0;
protected $page = 0;
protected $partitions = 200;
protected $bucketSize = Time::ONE_DAY;
public function __construct()
{
$this->from = strtotime('-7 days');
$this->to = time();
}
/**
* Set the current page to query
* @param int $page
* @return ActiveUsersQueryBuilder
*/
public function setPage(int $page): self
{
$this->page = $page;
return $this;
}
/**
* Set the number of partitions
* @param $partitions
* @return ActiveUsersQueryBuilder
*/
public function setPartitions($partitions): self
{
$this->partitions = $partitions;
return $this;
}
/**
* Set the from timestamp
* @param int $from
* @return ActiveUsersQueryBuilder
*/
public function setFrom(int $from): self
{
$this->from = $from;
return $this;
}
/**
* Set the to timestamp
* @param int $to
* @return ActiveUsersQueryBuilder
*/
public function setTo(int $to): self
{
$this->to = $to;
return $this;
}
/**
* Set the bucket size (interval in seconds)
* @param int $bucketSize
* @return ActiveUsersQueryBuilder
*/
public function setBucketSize(int $bucketSize): self
{
$this->bucketSize = $bucketSize;
return $this;
}
/**
* Return an ES query for the given parameters
* @return array
*/
public function query(): array
{
return [
'index' => $this->index,
'size' => '0',
'body' => [
'query' => [
'bool' => [
'must' => $this->must(),
],
],
'aggs' => $this->aggregations(),
],
];
}
/**
* Return a must clause for the query
* @return array
*/
private function must(): array
{
return [
['match_phrase' => [
'action.keyword' => [
'query' => 'active',
],
]],
['range' => [
'@timestamp' => [
'from' => $this->from * 1000,
'to' => $this->to * 1000,
'format' => 'epoch_millis',
],
]],
];
}
/**
* Builds up an aggregate that splits a user's activity into buckets
* @return array
*/
private function aggregations(): array
{
return [
'users' => [
'terms' => [
'field' => 'user_guid.keyword',
'size' => 5000,
'include' => [
'partition' => $this->page,
'num_partitions' => $this->partitions,
],
],
'aggs' => $this->bucketAggregations(),
],
];
}
/**
* Return the aggregations clauses for the ES query
* @return array
*/
private function bucketAggregations(): array
{
$bucketAggregations = [];
foreach ($this->buckets() as $bucketTime) {
$nextBucketTime = $bucketTime + $this->bucketSize;
$bucketAggregations[$bucketTime] = $this->rangeAggregation($bucketTime, $nextBucketTime);
$bucketAggregations["count-$bucketTime"] = $this->sumAggregation($bucketTime);
}
return $bucketAggregations;
}
/**
* Return an array of bucket timestamps
* @return array
*/
public function buckets(): array
{
return Time::intervalsBetween($this->from, $this->to, $this->bucketSize);
}
/**
* @param int $from
* @param int $to
* @return array
*/
private function rangeAggregation(int $from, int $to): array
{
return [
'date_range' => [
'field' => '@timestamp',
'ranges' => [
[
'from' => $from * 1000,
'to' => $to * 1000,
],
],
],
];
}
/**
* Builds up an aggregate that counts buckets with the same name
* @param string $name
* @return array
*/
private function sumAggregation(string $name): array
{
return [
'sum_bucket' => [
'buckets_path' => "$name>_count",
],
];
}
}
......@@ -2,6 +2,7 @@
namespace Minds\Core\Analytics\UserStates;
use Minds\Core\Data\ElasticSearch\Client;
use Minds\Core\Di\Di;
use Minds\Core\Queue;
......@@ -28,6 +29,9 @@ class Manager
/** @var array $pendingBulkInserts * */
private $pendingBulkInserts = [];
/** @var Client */
private $es;
public function __construct($client = null, $index = null, $queue = null, $activeUsersIterator = null, $userStateIterator = null)
{
$this->es = $client ?: Di::_()->get('Database\ElasticSearch');
......@@ -67,7 +71,7 @@ class Manager
$this->bulk();
}
public function emitStateChanges()
public function emitStateChanges(bool $estimate = false)
{
$this->userStateIterator->setReferenceDate($this->referenceDate);
......@@ -75,9 +79,16 @@ class Manager
foreach ($this->userStateIterator as $userState) {
//Reindex with previous state
$this->index($userState);
$this->queue->send([
'user_state_change' => $userState->export(),
]);
$payload = [
'user_state_change' => $userState->export()
];
if ($estimate) {
$payload['estimate'] = true;
}
$this->queue->send($payload);
}
$this->bulk();
}
......
......@@ -4,7 +4,6 @@ namespace Minds\Core\Analytics\UserStates;
use Minds\Core;
use Minds\Core\Di\Di;
use Minds\Core\Data;
/*
* Iterator that loops through users and counts their action.active entries for the past N days
......@@ -15,15 +14,15 @@ use Minds\Core\Data;
class UserStateIterator implements \Iterator
{
private $cursor = -1;
private $period = 0;
private $item;
private $limit = 400;
private $partitions = 200;
private $page = -1;
private $data = [];
private $active;
private $valid = true;
private $client;
private $position;
private $referenceDate;
public function __construct($client = null)
{
$this->client = $client ?: Di::_()->get('Database\ElasticSearch');
......@@ -31,20 +30,22 @@ class UserStateIterator implements \Iterator
$this->referenceDate = strtotime('midnight');
}
//Sets the last day for the iterator (ie, today)
public function setReferenceDate($referenceDate)
/**
* Sets the last day for the iterator (ie, today)
* @param $referenceDate
* @return $this
*/
public function setReferenceDate($referenceDate): self
{
$this->referenceDate = $referenceDate;
return $this;
}
public function get()
public function get(): bool
{
if ($this->page++ >= $this->partitions - 1) {
$this->valid = false;
return;
return false;
}
//Set the range for the entire query day - offset to day + 1
......@@ -52,13 +53,15 @@ class UserStateIterator implements \Iterator
$to = $this->referenceDate;
$must = [
['range' => [
'reference_date' => [
'gte' => $from * 1000, //midnight of the first day
'lte' => $to * 1000, //midnight of the last day
'format' => 'epoch_millis',
],
]],
[
'range' => [
'reference_date' => [
'gte' => $from * 1000, //midnight of the first day
'lte' => $to * 1000, //midnight of the last day
'format' => 'epoch_millis',
],
]
],
];
//split up users by user guid
......@@ -113,7 +116,6 @@ class UserStateIterator implements \Iterator
$result = $this->client->request($prepared);
} catch (\Exception $e) {
error_log($e);
return false;
}
......@@ -143,6 +145,8 @@ class UserStateIterator implements \Iterator
if ($this->cursor >= count($this->data)) {
$this->get();
}
return true;
}
/**
......@@ -158,8 +162,7 @@ class UserStateIterator implements \Iterator
/**
* Get the current cursor's data.
*
* @return mixed
* @return UserState
*/
public function current()
{
......@@ -168,7 +171,6 @@ class UserStateIterator implements \Iterator
/**
* Get cursor's key.
*
* @return mixed
*/
public function key()
......@@ -189,10 +191,9 @@ class UserStateIterator implements \Iterator
/**
* Checks if the cursor is valid.
*
* @return bool
*/
public function valid()
public function valid(): bool
{
return $this->valid && isset($this->data[$this->cursor]);
}
......
......@@ -15,9 +15,10 @@ class UserStateChange implements Interfaces\QueueRunner
{
$client = Queue\Client::Build();
$client->setQueue('UserStateChanges')
->receive(function ($data) use ($mailer) {
->receive(function ($data) {
$data = $data->getData();
$result = Dispatcher::trigger('user_state_change', $data['user_state_change']['state'], $data['user_state_change']);
$event = isset($data['estimate']) ? 'user_state_change_estimate' : 'user_state_change';
Dispatcher::trigger($event, $data['user_state_change']['state'], $data['user_state_change']);
});
}
}
<?php
namespace Minds\Core;
class Time
{
const HALF_HOUR = 1800;
const ONE_HOUR = 3600;
const TWO_HOUR = 7200;
const ONE_DAY = 86400;
const ONE_MIN = 60;
const FIVE_MIN = 300;
const TEN_MIN = 600;
const FIFTEEN_MIN = 900;
/**
* Return the interval timestamp a timestamp is within
* @param int $ts
* @param int $interval
* @return int
*/
public static function toInterval(int $ts, int $interval)
{
return $ts - ($ts % $interval);
}
/**
* Return an array of interval values between two timestamps
* @param int $start
* @param int $end
* @param int $interval
* @return array
*/
public static function intervalsBetween(int $start, int $end, int $interval): array
{
$startTs = self::toInterval($start, $interval);
$endTs = self::toInterval($end, $interval);
/* Exclusive not inclusive range should ignore first interval value */
if ($startTs < $endTs)
$startTs += $interval;
$intervals = [];
while ($startTs <= $endTs) {
$intervals[] = $startTs;
$startTs += $interval;
}
return $intervals;
}
}
......@@ -56,6 +56,8 @@ class User extends \ElggUser
$this->attributes['toaster_notifications'] = 1;
$this->attributes['user_state'] = Core\Analytics\UserStates\UserState::STATE_UNKNOWN;
$this->attributes['user_state_updated_ms'] = 0;
$this->attributes['user_state_today'] = Core\Analytics\UserStates\UserState::STATE_UNKNOWN;
$this->attributes['user_state_today_updated_ms'] = 0;
parent::initializeAttributes();
}
......@@ -774,6 +776,8 @@ class User extends \ElggUser
$export['rating'] = $this->getRating();
$export['user_state'] = $this->getUserState();
$export['user_state_updated_ms'] = (int) $this->getUserStateUpdatedMs();
$export['user_state_today'] = $this->getUserStateToday();
$export['user_state_today_updated_ms'] = (int)$this->getUserStateTodayUpdatedMs();
return $export;
}
......@@ -1142,6 +1146,42 @@ class User extends \ElggUser
return $this;
}
/**
* @return string
*/
public function getUserStateToday(): string
{
return $this->user_state_today;
}
/**
* @param string $state
* @return $this
*/
public function setUserStateToday(string $state): self
{
$this->user_state_today = $state;
return $this;
}
/**
* @return int
*/
public function getUserStateTodayUpdatedMs(): int
{
return $this->user_state_today_updated_ms;
}
/**
* @param int $msTimestamp
* @return $this
*/
public function setUserStateTodayUpdatedMs(int $msTimestamp): self
{
$this->user_state_today_updated_ms = $msTimestamp;
return $this;
}
/**
* Returns btc_address
* @return string
......