...
 
Commits (8)
......@@ -20,6 +20,7 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
$this->out('--to={timestamp} the day to stop counting. Default is yesterday at midnight');
$this->out('--rangeOffset={number of days} the number of days to look back into the past. Default is 7');
$this->out('--mode={silent | notify} silent mode does not send emails when running batches to re-index. Notify sends the notifications. Default is notify');
$this->out('--debug Enable debug output');
break;
case 'sync_graphs':
$this->out('sync graphs between es and cassandra');
......@@ -45,10 +46,11 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
error_reporting(E_ALL);
ini_set('display_errors', 1);
$estimate = false;
$debug = !is_null($this->getOpt('debug'));
if ($this->getOpt('incremental')) {
$estimate = true;
$from = strtotime('midnight +1 day'); //run throughout the day, provides estimates
$from = strtotime('midnight'); //run throughout the day, provides estimates
$to = $from;
} else {
$from = (strtotime('midnight', $this->getOpt('from')) ?: strtotime('midnight yesterday'));
......@@ -62,8 +64,9 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
while ($from <= $to) {
$this->out('Syncing for ' . gmdate('c', $from));
$manager = new Core\Analytics\UserStates\Manager();
$manager->setReferenceDate($from)
->setRangeOffset($rangeOffset)
$manager->setDebug($debug)
->setReferenceTimestamp($from)
->setNumberOfIntervals($rangeOffset)
->sync();
if ($mode === 'notify') {
$this->out('Sending notifications');
......@@ -164,4 +167,19 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
}
$this->out('Done');
}
public function fakeActivity()
{
$user_guid = $this->getOpt('user_guid');
$timestamp = $this->getOpt('timestamp_ms') ?? time() * 1000;
$event = new Core\Analytics\Metrics\Event();
$event->setType('action')
->setProduct('platform')
->setUserGuid((string)$user_guid)
->setTimestamp($timestamp)
->setAction('fake');
$this->out(print_r($event->push(), true));
}
}
......@@ -33,7 +33,7 @@ class UpdateUserState
private function updateUserEntity(): void
{
$this->user->setUserState($this->userState->getState())
->setUserStateUpdatedMs($this->userState->getReferenceDateMs())
->setUserStateUpdatedMs(time() * 1000)
->save();
}
......
......@@ -38,7 +38,7 @@ class UpdateUserStateEstimate
private function updateUserEntity(): void
{
$this->user->setUserStateToday($this->userState->getState())
->setUserStateTodayUpdatedMs($this->userState->getReferenceDateMs())
->setUserStateTodayUpdatedMs(time() * 1000)
->save();
}
......
......@@ -27,7 +27,7 @@ use Minds\Core;
*/
class Event
{
private $elatic;
private $elastic;
private $index = 'minds-metrics-';
protected $data;
......@@ -44,9 +44,17 @@ class Event
return $this;
}
public function setTimestamp(int $timestamp)
{
$this->data['@timestamp'] = $timestamp;
return $this;
}
public function push()
{
$this->data['@timestamp'] = (int) microtime(true) * 1000;
if (!isset($this->data['@timestamp'])) {
$this->data['@timestamp'] = (int)microtime(true) * 1000;
}
$this->data['user_agent'] = $this->getUserAgent();
$this->data['ip_hash'] = $this->getIpHash();
......
......@@ -17,43 +17,55 @@ class ActiveUsersIterator implements \Iterator
/** @var UserActivityBuckets[] $data */
protected $data = [];
protected $valid = true;
protected $referenceDate;
protected $rangeOffset = 7;
protected $referenceTimestamp;
protected $numberOfIntervals = 7;
/** @var Data\ElasticSearch\Client */
protected $client;
/** @var ActiveUsersQueryBuilder */
protected $queryBuilder;
protected $cursor = -1;
protected $partitions = 200;
protected $partitions = 1;
protected $page = -1;
protected $intervalSize = Core\Time::ONE_DAY;
public function __construct(Data\ElasticSearch\Client $client = null, ActiveUsersQueryBuilder $queryBuilder = null)
{
$this->client = $client ?: Di::_()->get('Database\ElasticSearch');
$this->queryBuilder = $queryBuilder ?? new ActiveUsersQueryBuilder();
$this->queryBuilder->setPartitions($this->partitions);
$this->referenceDate = strtotime('midnight');
$this->referenceTimestamp = strtotime('midnight');
}
/**
* Sets the last day for the iterator (ie, today)
* @param int $referenceDate
* Sets the interval/bucket size
* @param int $intervalSize
* @return self $this
*/
public function setIntervalSize(int $intervalSize): self
{
$this->intervalSize = $intervalSize;
return $this;
}
/**
* Sets the reference for the last interval
* @param int $referenceTimestamp
* @return ActiveUsersIterator
*/
public function setReferenceDate(int $referenceDate): self
public function setReferenceTimestamp(int $referenceTimestamp): self
{
$this->referenceDate = $referenceDate;
$this->referenceTimestamp = Core\Time::toInterval($referenceTimestamp, $this->intervalSize);
return $this;
}
/**
* Sets the number of days to look backwards
* @param int $rangeOffset
* Sets the number of intervals to look backwards
* @param int $numberOfIntervals
* @return ActiveUsersIterator
*/
public function setRangeOffset(int $rangeOffset): self
public function setNumberOfIntervals(int $numberOfIntervals): self
{
$this->rangeOffset = $rangeOffset;
$this->numberOfIntervals = $numberOfIntervals;
return $this;
}
......@@ -64,8 +76,13 @@ class ActiveUsersIterator implements \Iterator
return false;
}
$from = strtotime("-$this->rangeOffset day", $this->referenceDate);
$query = $this->queryBuilder->setFrom($from)->setTo($this->referenceDate)->setPage($this->page)->query();
$from = $this->referenceTimestamp - ($this->intervalSize * $this->numberOfIntervals);
$to = $this->referenceTimestamp + ($this->intervalSize - 1); // Last timestamp of reference interval
$query = $this->queryBuilder
->setFrom($from)
->setTo($to)
->setPage($this->page)
->query();
$prepared = new Core\Data\ElasticSearch\Prepared\Search();
$prepared->query($query);
......@@ -83,10 +100,6 @@ class ActiveUsersIterator implements \Iterator
/* Derive activity data from the ES results */
foreach ($result['aggregations']['users']['buckets'] as $userActivityByDay) {
$userActivityBuckets = (new UserActivityBuckets())
->setUserGuid($userActivityByDay['key'])
->setReferenceDateMs($this->referenceDate * 1000);
$days = [];
foreach ($this->queryBuilder->buckets() as $bucketTime) {
$days[] = [
......@@ -95,7 +108,15 @@ class ActiveUsersIterator implements \Iterator
];
}
$userActivityBuckets->setActiveDaysBuckets($days);
$userActivityBuckets = (new UserActivityBuckets())
->setUserGuid($userActivityByDay['key'])
->setReferenceDateMs($this->referenceTimestamp * 1000)
->setActiveDaysBuckets($days);
usort($days, function ($a, $b) {
return $a['reference_date'] <=> $b['reference_date'];
});
$this->data[] = $userActivityBuckets;
}
......
......@@ -11,11 +11,11 @@ class Manager
/** @var Queue\RabbitMQ\Client */
private $queue;
/** @var int $referenceDate */
private $referenceDate;
/** @var int $referenceTimestamp */
private $referenceTimestamp;
/** @var int $rangeOffet */
private $rangeOffset = 7;
/** @var int $numberOfIntervals */
private $numberOfIntervals = 7;
/** @var string $userStateIndex */
private $userStateIndex;
......@@ -32,6 +32,8 @@ class Manager
/** @var Client */
private $es;
private $debug = false;
public function __construct($client = null, $index = null, $queue = null, $activeUsersIterator = null, $userStateIterator = null)
{
$this->es = $client ?: Di::_()->get('Database\ElasticSearch');
......@@ -41,24 +43,28 @@ class Manager
$this->userStateIterator = $userStateIterator ?: new UserStateIterator();
}
public function setReferenceDate($referenceDate)
public function setReferenceTimestamp($referenceDate): self
{
$this->referenceDate = $referenceDate;
$this->referenceTimestamp = $referenceDate;
return $this;
}
public function setRangeOffset($rangeOffset)
public function setNumberOfIntervals($numberOfIntervals): self
{
$this->$rangeOffset = $rangeOffset;
$this->$numberOfIntervals = $numberOfIntervals;
return $this;
}
public function setDebug(bool $debug): self
{
$this->debug = $debug;
return $this;
}
public function sync()
{
$this->activeUsersIterator->setReferenceDate($this->referenceDate);
$this->activeUsersIterator->setRangeOffset($this->rangeOffset);
$this->activeUsersIterator->setReferenceTimestamp($this->referenceTimestamp);
$this->activeUsersIterator->setNumberOfIntervals($this->numberOfIntervals);
foreach ($this->activeUsersIterator as $activeUser) {
$userState = (new UserState())
......@@ -73,12 +79,14 @@ class Manager
public function emitStateChanges(bool $estimate = false)
{
$this->userStateIterator->setReferenceDate($this->referenceDate);
$this->userStateIterator->setReferenceTimestamp($this->referenceTimestamp);
$this->queue->setQueue('UserStateChanges');
foreach ($this->userStateIterator as $userState) {
//Reindex with previous state
$this->index($userState);
$this->debugLog($userState);
if (!empty($userState->getPreviousState())) {
$this->index($userState);
}
$payload = [
'user_state_change' => $userState->export()
......@@ -100,7 +108,7 @@ class Manager
*
* @return bool
*/
public function index($userState)
public function index(UserState $userState)
{
$this->pendingBulkInserts[] = [
'update' => [
......@@ -111,7 +119,7 @@ class Manager
];
$this->pendingBulkInserts[] = [
'doc' => $userState->export(),
'doc' => $userState->export(false),
'doc_as_upsert' => true,
];
......@@ -128,8 +136,16 @@ class Manager
public function bulk()
{
if (count($this->pendingBulkInserts) > 0) {
$this->es->bulk(['body' => $this->pendingBulkInserts]);
$result = $this->es->bulk(['body' => $this->pendingBulkInserts]);
$this->pendingBulkInserts = [];
}
}
private function debugLog($var): void
{
if ($this->debug) {
$caller = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1]['function'];
error_log($caller . ':' . print_r($var, true));
}
}
}
......@@ -43,17 +43,14 @@ class UserActivityBuckets
/** @var array $daysActiveBuckets */
private $daysActiveBuckets = [];
//Values derived from buckets
private $numberOfDays = 0;
private $mostRecentDayCount = 0;
private $oldestDayCount = 0;
private $activityPercentage = 0;
public function isNewUser(): bool
{
$guid = new Guid();
$newUserThresholdTimestamp = strtotime('-'.static::NEW_USER_AGE_HOURS.' hours', $this->referenceDateMs / 1000);
$maxNewUserThresholdTimestamp = strtotime('+'.static::NEW_USER_AGE_HOURS.' hours', $newUserThresholdTimestamp);
$maxNewUserThresholdTimestamp = $this->referenceDateMs / 1000;
$newUserThresholdTimestamp = strtotime('-' . static::NEW_USER_AGE_HOURS . ' hours', $maxNewUserThresholdTimestamp);
$referenceGuid = $guid->generate($newUserThresholdTimestamp * 1000);
$maxReferenceGuid = $guid->generate($maxNewUserThresholdTimestamp * 1000);
......@@ -68,8 +65,7 @@ class UserActivityBuckets
{
$this->daysActiveBuckets = $buckets;
$this->numberOfDays = count($this->daysActiveBuckets);
$this->mostRecentDayCount = $this->daysActiveBuckets[0]['count'];
$this->oldestDayCount = $this->daysActiveBuckets[$this->numberOfDays - 1]['count'];
$this->mostRecentDayCount = end($this->daysActiveBuckets)['count'];
return $this;
}
......@@ -77,8 +73,7 @@ class UserActivityBuckets
public function getActiveDayCount(): int
{
$activeDayCount = 0;
//increment activity for each day save for the oldest day used to to determine if a user went cold
for ($dayIndex = 0; $dayIndex <= $this->numberOfDays - 2; ++$dayIndex) {
for ($dayIndex = 0; $dayIndex <= $this->numberOfDays - 1; ++$dayIndex) {
if ($this->daysActiveBuckets[$dayIndex]['count'] > 0) {
++$activeDayCount;
}
......@@ -89,11 +84,12 @@ class UserActivityBuckets
public function getActivityPercentage(): string
{
return number_format($this->getActiveDayCount() / ($this->numberOfDays - 1), 2);
return number_format($this->getActiveDayCount() / ($this->numberOfDays), 2);
}
public function getState() : string
{
// How do we reach new user state if we have no activity???
if ($this->isNewUser()) {
return UserState::STATE_NEW;
} elseif ($this->getActivityPercentage() >= static::THRESHOLD_CORE_USER) {
......@@ -102,7 +98,7 @@ class UserActivityBuckets
return UserState::STATE_CASUAL;
} elseif ($this->mostRecentDayCount > 0 && $this->getActiveDayCount() == 1) {
return UserState::STATE_RESURRECTED;
} elseif ($this->oldestDayCount > 0 && $this->getActiveDayCount() == 0) {
} elseif ($this->getActiveDayCount() == 0) {
return UserState::STATE_COLD;
} elseif ($this->getActiveDayCount() >= 1) {
return UserState::STATE_CURIOUS;
......
......@@ -17,7 +17,6 @@ use Minds\Traits\MagicAttributes;
* @method int getReferenceDateMs()
* @method UserState setState(string $state)
* @method string getState()
* @method UserState setPreviousState(string $state)
* @method string getPreviousState()
* @method UserState setActivityPercentage(float $activityPercentage)
* @method float getActivityPercentage()
......@@ -64,10 +63,9 @@ class UserState
/** @var int $stateChange */
private $stateChange;
public function export(): array
public function export(bool $includeNullValues = true): array
{
$this->deriveStateChange();
return [
$data = [
'user_guid' => $this->userGuid,
'reference_date' => $this->referenceDateMs,
'state' => $this->state,
......@@ -77,6 +75,23 @@ class UserState
'previous_reward_factor' => RewardFactor::getForUserState($this->previousState),
'state_change' => $this->stateChange
];
if (!$includeNullValues) {
foreach ($data as $key => $value) {
if (is_null($value)) {
unset($data[$key]);
}
}
}
return $data;
}
public function setPreviousState($previousState): self
{
$this->previousState = $previousState;
$this->deriveStateChange();
return $this;
}
private function deriveStateChange(): void
......@@ -99,7 +114,6 @@ class UserState
->setReferenceDateMs($data['reference_date'])
->setState($data['state'])
->setPreviousState($data['previous_state'])
->setActivityPercentage($data['activity_percentage'])
->setStateChange(self::stateChange($data['previous_state'], $data['state']));
->setActivityPercentage($data['activity_percentage']);
}
}
......@@ -21,23 +21,35 @@ class UserStateIterator implements \Iterator
private $client;
private $position;
private $referenceDate;
private $referenceTimestamp;
private $intervalSize = Core\Time::ONE_DAY;
public function __construct($client = null)
{
$this->client = $client ?: Di::_()->get('Database\ElasticSearch');
$this->position = 0;
$this->referenceDate = strtotime('midnight');
$this->referenceTimestamp = strtotime('midnight');
}
/**
* Sets the last day for the iterator (ie, today)
* @param $referenceDate
* @return $this
* Sets the last interval timestamp for the iterator
* @param int $referenceTimestamp
* @return self $this
*/
public function setReferenceDate($referenceDate): self
public function setReferenceTimestamp(int $referenceTimestamp): self
{
$this->referenceDate = $referenceDate;
$this->referenceTimestamp = $referenceTimestamp;
return $this;
}
/**
* Sets the interval/bucket size
* @param int $intervalSize
* @return self $this
*/
public function setIntervalSize(int $intervalSize): self
{
$this->intervalSize = $intervalSize;
return $this;
}
......@@ -48,9 +60,9 @@ class UserStateIterator implements \Iterator
return false;
}
//Set the range for the entire query day - offset to day + 1
$from = strtotime('-1 day', $this->referenceDate);
$to = $this->referenceDate;
/* Round reference to interval and include previous interval for previous state */
$to = Core\Time::toInterval($this->referenceTimestamp, $this->intervalSize);
$from = $to - $this->intervalSize;
$must = [
[
......@@ -76,11 +88,6 @@ class UserStateIterator implements \Iterator
],
],
'aggs' => [
'unique_state' => [
'cardinality' => [
'field' => 'state',
],
],
'latest_state' => [
'top_hits' => [
'docvalue_fields' => ['state'],
......@@ -120,25 +127,25 @@ class UserStateIterator implements \Iterator
}
if ($result && $result['aggregations']['user_state']['buckets']) {
$document = $result['aggregations']['user_state']['buckets'][0]['latest_state']['hits']['hits'][0];
if ($result['aggregations']['user_state']['buckets'][0]['unique_state']['value'] == 2) {
//Fire off state changes
$previousDocument = $result['aggregations']['user_state']['buckets'][0]['latest_state']['hits']['hits'][1];
$userState = (new UserState())
->setUserGuid($document['_source']['user_guid'])
->setReferenceDateMs($document['_source']['reference_date'])
->setState($document['_source']['state'])
->setPreviousState($previousDocument['_source']['state'])
->setActivityPercentage($document['_source']['activity_percentage']);
$this->data[] = $userState;
} elseif ($result['aggregations']['user_state']['buckets'][0]['doc_count'] == 1) {
//Fire off single states (new user, resurrected or a gap)
$userState = (new UserState())
->setUserGuid($document['_source']['user_guid'])
->setReferenceDateMs($document['_source']['reference_date'])
->setState($document['_source']['state'])
->setActivityPercentage($document['_source']['activity_percentage']);
$this->data[] = $userState;
if (isset($result['aggregations']['user_state']['buckets'][0]['latest_state']['hits']['hits'][0])) {
$document = $result['aggregations']['user_state']['buckets'][0]['latest_state']['hits']['hits'][0];
if (isset($result['aggregations']['user_state']['buckets'][0]['latest_state']['hits']['hits'][1])) {
$previousDocument = $result['aggregations']['user_state']['buckets'][0]['latest_state']['hits']['hits'][1];
$userState = (new UserState())
->setUserGuid($document['_source']['user_guid'])
->setReferenceDateMs($document['_source']['reference_date'])
->setState($document['_source']['state'])
->setPreviousState($previousDocument['_source']['state'])
->setActivityPercentage($document['_source']['activity_percentage']);
$this->data[] = $userState;
} else {
$userState = (new UserState())
->setUserGuid($document['_source']['user_guid'])
->setReferenceDateMs($document['_source']['reference_date'])
->setState($document['_source']['state'])
->setActivityPercentage($document['_source']['activity_percentage']);
$this->data[] = $userState;
}
}
}
......
......@@ -55,8 +55,7 @@ class UserActivityBucketsSpec extends ObjectBehavior
['count' => 0],
['count' => 0],
['count' => 0],
['count' => 0],
['count' => 1],
['count' => 0]
];
$userGuid = $guid->generate(strtotime('-7 days') * 1000);
$this->setReferenceDateMs(strtotime('midnight') * 1000)
......@@ -73,14 +72,13 @@ class UserActivityBucketsSpec extends ObjectBehavior
$guid = new Guid();
$userGuid = $guid->generate(strtotime('-1 month') * 1000);
$activeDayBuckets = [
['count' => 1],
['count' => 26],
['count' => 0],
['count' => 0],
['count' => 0],
['count' => 0],
['count' => 0],
['count' => 0],
['count' => 26],
];
$this->setReferenceDateMs(strtotime('midnight') * 1000)
......@@ -98,7 +96,6 @@ class UserActivityBucketsSpec extends ObjectBehavior
$guid = new Guid();
$userGuid = $guid->generate(strtotime('-1 month') * 1000);
$activeDayBuckets = [
['count' => 1],
['count' => 13],
['count' => 0],
['count' => 0],
......@@ -122,7 +119,6 @@ class UserActivityBucketsSpec extends ObjectBehavior
$guid = new Guid();
$userGuid = $guid->generate(strtotime('-1 month') * 1000);
$activeDayBuckets = [
['count' => 0],
['count' => 1],
['count' => 1],
['count' => 0],
......@@ -147,11 +143,10 @@ class UserActivityBucketsSpec extends ObjectBehavior
$guid = new Guid();
$userGuid = $guid->generate(strtotime('-1 month') * 1000);
$activeDayBuckets = [
['count' => 0],
['count' => 1],
['count' => 0],
['count' => 0],
['count' => 0],
['count' => 1],
['count' => 0],
['count' => 0],
['count' => 0],
......@@ -172,7 +167,6 @@ class UserActivityBucketsSpec extends ObjectBehavior
$guid = new Guid();
$userGuid = $guid->generate(strtotime('-1 month') * 1000);
$activeDayBuckets = [
['count' => 1],
['count' => 13],
['count' => 5],
['count' => 9],
......@@ -198,7 +192,6 @@ class UserActivityBucketsSpec extends ObjectBehavior
$referenceDate = strtotime('midnight');
$userGuid = $guid->generate(strtotime('-12 hours', $referenceDate) * 1000);
$activeDayBuckets = [
['count' => 1],
['count' => 13],
['count' => 5],
['count' => 9],
......@@ -224,7 +217,6 @@ class UserActivityBucketsSpec extends ObjectBehavior
$referenceDate = strtotime('midnight');
$userGuid = $guid->generate(strtotime('-24 hours -1 minute', $referenceDate) * 1000);
$activeDayBuckets = [
['count' => 0],
['count' => 0],
['count' => 1],
['count' => 0],
......