diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7d1c621d..20c19233 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,37 +23,32 @@ jobs: with: python-version: 3.7 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.8 uses: ./ with: python-version: 3.8 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.9 uses: ./ with: python-version: 3.9 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.10 uses: ./ with: python-version: 3.10 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.11 uses: ./ with: python-version: 3.11 - name: Run tests - run: make test - run: make e2e_test \ No newline at end of file + run: make test && make e2e_test diff --git a/segment/analytics/__init__.py b/segment/analytics/__init__.py index 230769b5..db25221d 100644 --- a/segment/analytics/__init__.py +++ b/segment/analytics/__init__.py @@ -15,6 +15,7 @@ gzip = Client.DefaultConfig.gzip timeout = Client.DefaultConfig.timeout max_retries = Client.DefaultConfig.max_retries +keep_alive = Client.DefaultConfig.keep_alive default_client = None @@ -48,6 +49,10 @@ def screen(*args, **kwargs): """Send a screen call.""" return _proxy('screen', *args, **kwargs) +def object(*args, **kwargs): + """Send a object call.""" + return _proxy('object', *args, **kwargs) + def flush(): """Tell the client to flush.""" diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 515da899..ac0f028a 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -21,11 +21,13 @@ class Client(object): class DefaultConfig(object): write_key = None host = None + endpoint = None on_error = None debug = False send = True sync_mode = False max_queue_size = 10000 + max_object_queue_size = 10 gzip = False timeout = 15 max_retries = 10 @@ -33,6 +35,7 @@ class DefaultConfig(object): thread = 1 upload_interval = 0.5 upload_size = 100 + keep_alive = True """Create a new Segment client.""" log = logging.getLogger('segment') @@ -51,7 +54,8 @@ def __init__(self, proxies=DefaultConfig.proxies, thread=DefaultConfig.thread, upload_size=DefaultConfig.upload_size, - upload_interval=DefaultConfig.upload_interval,): + upload_interval=DefaultConfig.upload_interval, + keep_alive=DefaultConfig.keep_alive): require('write_key', write_key, str) self.queue = queue.Queue(max_queue_size) @@ -64,6 +68,7 @@ def __init__(self, self.gzip = gzip self.timeout = timeout self.proxies = proxies + self.keep_alive = keep_alive if debug: self.log.setLevel(logging.DEBUG) @@ -85,7 +90,7 @@ def __init__(self, self.queue, write_key, host=host, on_error=on_error, upload_size=upload_size, upload_interval=upload_interval, gzip=gzip, retries=max_retries, timeout=timeout, - proxies=proxies, + proxies=proxies, keep_alive=keep_alive, ) self.consumers.append(consumer) @@ -239,31 +244,47 @@ def screen(self, user_id=None, category=None, name=None, properties=None, return self._enqueue(msg) - def _enqueue(self, msg): - """Push a new `msg` onto the queue, return `(success, msg)`""" - timestamp = msg['timestamp'] - if timestamp is None: - timestamp = datetime.utcnow().replace(tzinfo=tzutc()) - message_id = msg.get('messageId') - if message_id is None: - message_id = uuid4() - - require('integrations', msg['integrations'], dict) - require('type', msg['type'], str) - require('timestamp', timestamp, datetime) - require('context', msg['context'], dict) - - # add common - timestamp = guess_timezone(timestamp) - msg['timestamp'] = timestamp.isoformat(timespec='milliseconds') - msg['messageId'] = stringify_id(message_id) - msg['context']['library'] = { - 'name': 'analytics-python', - 'version': VERSION + def object(self, object_id=None, collection=None, properties=None): + properties = properties or {} + require('object_id', object_id, ID_TYPES) + require('collection', collection, str) + require('properties', properties, dict) + + msg = { + 'id': object_id, + 'collection': collection, + 'properties': properties, + 'type': 'object' } - msg['userId'] = stringify_id(msg.get('userId', None)) - msg['anonymousId'] = stringify_id(msg.get('anonymousId', None)) + return self._enqueue(msg) + + def _enqueue(self, msg): + """Push a new `msg` onto the queue, return `(success, msg)`""" + if msg['type'] != 'object': + timestamp = msg['timestamp'] + if timestamp is None: + timestamp = datetime.utcnow().replace(tzinfo=tzutc()) + message_id = msg.get('messageId') + if message_id is None: + message_id = uuid4() + + require('integrations', msg['integrations'], dict) + require('type', msg['type'], str) + require('timestamp', timestamp, datetime) + require('context', msg['context'], dict) + + # add common + timestamp = guess_timezone(timestamp) + msg['timestamp'] = timestamp.isoformat(timespec='milliseconds') + msg['messageId'] = stringify_id(message_id) + msg['context']['library'] = { + 'name': 'analytics-python', + 'version': VERSION + } + + msg['userId'] = stringify_id(msg.get('userId', None)) + msg['anonymousId'] = stringify_id(msg.get('anonymousId', None)) msg = clean(msg) self.log.debug('queueing: %s', msg) @@ -294,9 +315,8 @@ def _enqueue(self, msg): def flush(self): """Forces a flush from the internal queue to the server""" - queue = self.queue - size = queue.qsize() - queue.join() + size = self.queue.qsize() + self.queue.join() # Note that this message may not be precise, because of threading. self.log.debug('successfully flushed about %s items.', size) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 27586284..bdcd9ad5 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -4,10 +4,13 @@ import backoff import json -from segment.analytics.request import post, APIError, DatetimeSerializer +from segment.analytics.request import post, APIError, DatetimeSerializer, start_object_session, post_object, \ + keep_alive_object_session from queue import Empty +from segment.analytics.utils import remove_trailing_slash + MAX_MSG_SIZE = 32 << 10 # Our servers only accept batches less than 500KB. Here limit is set slightly @@ -19,9 +22,9 @@ class Consumer(Thread): """Consumes the messages from the client's queue.""" log = logging.getLogger('segment') - def __init__(self, queue, write_key, upload_size=100, host=None, + def __init__(self, queue, write_key, upload_size=100, host=None, endpoint=None, on_error=None, upload_interval=0.5, gzip=False, retries=10, - timeout=15, proxies=None): + timeout=15, proxies=None, keep_alive=True): """Create a consumer thread.""" Thread.__init__(self) # Make consumer a daemon thread so that it doesn't block program exit @@ -29,7 +32,8 @@ def __init__(self, queue, write_key, upload_size=100, host=None, self.upload_size = upload_size self.upload_interval = upload_interval self.write_key = write_key - self.host = host + self.host = host or 'https://api.segment.io' + self.endpoint = endpoint or '/v1/batch' self.on_error = on_error self.queue = queue self.gzip = gzip @@ -41,12 +45,22 @@ def __init__(self, queue, write_key, upload_size=100, host=None, self.retries = retries self.timeout = timeout self.proxies = proxies + # Object API + self.object_host = 'https://objects-bulk-api.segmentapis.com' + self.object_start_endpoint = '/v0/start' + self.object_upload_endpoint = '/v0/upload/{sync_id}' + self.object_keep_alive_endpoint = '/v0/keep-alive/{sync_id}' + self.session_id = None + self.keep_alive = keep_alive + self.keep_alive_time = None + self.keep_alive_interval = 9.5 * 60 # 9.5 minutes in seconds def run(self): """Runs the consumer.""" self.log.debug('consumer is running...') while self.running: self.upload() + self.check_keep_alive() self.log.debug('consumer exited.') @@ -57,12 +71,14 @@ def pause(self): def upload(self): """Upload the next batch of items, return whether successful.""" success = False - batch = self.next() - if len(batch) == 0: + batch, objects = self.next() + total_size = len(batch) + len(objects) + if total_size == 0: return False try: self.request(batch) + self.request_object(objects) success = True except Exception as e: self.log.error('error uploading: %s', e) @@ -71,7 +87,7 @@ def upload(self): self.on_error(e, batch) finally: # mark items as acknowledged from queue - for _ in batch: + for _ in range(total_size): self.queue.task_done() return success @@ -79,6 +95,7 @@ def next(self): """Return the next batch of items to upload.""" queue = self.queue items = [] + objects = [] start_time = monotonic.monotonic() total_size = 0 @@ -95,8 +112,13 @@ def next(self): if item_size > MAX_MSG_SIZE: self.log.error( 'Item exceeds 32kb limit, dropping. (%s)', str(item)) + queue.task_done() # Call task_done() for the dropped item continue - items.append(item) + if item['type'] == 'object': + objects.append(item) + else: + items.append(item) + # TODO: total_size needs to account for both APIs total_size += item_size if total_size >= BATCH_SIZE_LIMIT: self.log.debug( @@ -105,12 +127,56 @@ def next(self): except Empty: break except Exception as e: + queue.task_done() # Call task_done() for items that raise exceptions self.log.exception('Exception: %s', e) - return items + return items, objects + + def check_keep_alive(self): + """Check if the object session needs to be kept alive """ + if not self.keep_alive or self.keep_alive_time is None or self.session_id is None: + return + now = monotonic.monotonic() + elapsed = now - self.keep_alive_time + if elapsed >= self.keep_alive_interval: + self.log.debug('keep alive interval passed') + self.request_keep_alive() + + def request_keep_alive(self): + """Keep alive the object session """ + if self.session_id is None: + self.log.error('No object session id found, cannot keep alive session') + return + url = remove_trailing_slash(self.object_host) + self.object_keep_alive_endpoint.format(sync_id=self.session_id) + self._request(keep_alive_object_session, url, self.write_key, self.session_id, + timeout=self.timeout, proxies=self.proxies) + self.keep_alive_time = monotonic.monotonic() + + def request_object(self, batch): + """Upload object data to the object API """ + if len(batch) == 0: + return + + # TODO add session keep alive logic + if self.session_id is None: + url = remove_trailing_slash(self.object_host) + self.object_start_endpoint + self.session_id = self._request(start_object_session, url, self.write_key, + timeout=self.timeout, proxies=self.proxies) + + url = remove_trailing_slash(self.object_host) + self.object_upload_endpoint.format(sync_id=self.session_id) + self._request(post_object, url, batch, write_key=self.write_key, timeout=self.timeout, proxies=self.proxies) + self.keep_alive_time = monotonic.monotonic() def request(self, batch): """Attempt to upload the batch and retry before raising an error """ + if len(batch) == 0: + return + + self._request(post, self.write_key, self.host, endpoint=self.endpoint, gzip=self.gzip, + timeout=self.timeout, batch=batch, proxies=self.proxies) + + def _request(self, request_func, *args, **kwargs): + """Request wrapper that will retry before raising an error """ def fatal_exception(exc): if isinstance(exc, APIError): @@ -128,7 +194,6 @@ def fatal_exception(exc): max_tries=self.retries + 1, giveup=fatal_exception) def send_request(): - post(self.write_key, self.host, gzip=self.gzip, - timeout=self.timeout, batch=batch, proxies=self.proxies) + return request_func(*args, **kwargs) - send_request() + return send_request() diff --git a/segment/analytics/request.py b/segment/analytics/request.py index d1901f79..a6981544 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -13,12 +13,28 @@ _session = sessions.Session() -def post(write_key, host=None, gzip=False, timeout=15, proxies=None, **kwargs): +def send_post(*args, **kwargs): + log = logging.getLogger('segment') + res = _session.post(*args, **kwargs) + + if res.status_code in [200, 201]: + log.debug('request sent successfully') + return res + + try: + payload = res.json() + log.debug('received response: %s', payload) + raise APIError(res.status_code, payload['code'], payload['message']) + except ValueError: + raise APIError(res.status_code, 'unknown', res.text) + + +def post(write_key, host=None, endpoint=None, gzip=False, timeout=15, proxies=None, **kwargs): """Post the `kwargs` to the API""" log = logging.getLogger('segment') body = kwargs body["sentAt"] = datetime.utcnow().replace(tzinfo=tzutc()).isoformat() - url = remove_trailing_slash(host or 'https://api.segment.io') + '/v1/batch' + url = remove_trailing_slash(host) + endpoint auth = HTTPBasicAuth(write_key, '') data = json.dumps(body, cls=DatetimeSerializer) log.debug('making request: %s', data) @@ -39,25 +55,91 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, **kwargs): "data": data, "auth": auth, "headers": headers, - "timeout": 15, + "timeout": timeout, } if proxies: kwargs['proxies'] = proxies - res = _session.post(url, data=data, auth=auth, - headers=headers, timeout=timeout) + return send_post(url, **kwargs) - if res.status_code == 200: - log.debug('data uploaded successfully') - return res - try: - payload = res.json() - log.debug('received response: %s', payload) - raise APIError(res.status_code, payload['code'], payload['message']) - except ValueError: - raise APIError(res.status_code, 'unknown', res.text) +def start_object_session(url, write_key, timeout=15, proxies=None): + """Start a new session with Segment's Object API.""" + log = logging.getLogger('segment') + auth = HTTPBasicAuth(write_key, '') + headers = { + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-python/' + VERSION + } + + kwargs = { + "auth": auth, + "headers": headers, + "timeout": timeout, + } + + if proxies: + kwargs['proxies'] = proxies + + res = send_post(url, auth=auth, headers=headers, timeout=timeout) + data = res.json() + sync_id = data['sync_id'] + log.debug(f'object session start: {sync_id = }') + return data['sync_id'] + +def keep_alive_object_session(url, write_key, sync_id, timeout=15, proxies=None): + log = logging.getLogger('segment') + auth = HTTPBasicAuth(write_key, '') + headers = { + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-python/' + VERSION + } + + kwargs = { + "auth": auth, + "headers": headers, + "timeout": timeout, + } + + if proxies: + kwargs['proxies'] = proxies + + send_post(url, auth=auth, headers=headers, timeout=timeout) + log.debug(f'object session kept alive: {sync_id = }') + + +def post_object(url, batch, write_key=None, timeout=15, proxies=None): + log = logging.getLogger('segment') + auth = HTTPBasicAuth(write_key, '') + + data_str = "\n".join(json.dumps(obj) for obj in batch) + buf = BytesIO() + with GzipFile(fileobj=buf, mode='w') as gz: + # 'data' was produced by json.dumps(), + # whose default encoding is utf-8. + gz.write(data_str.encode('utf-8')) + data = buf.getvalue() + + headers = { + 'Content-Encoding': 'gzip', + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-python/' + VERSION + } + + kwargs = { + "auth": auth, + "headers": headers, + "timeout": timeout, + "data": data, + } + + if proxies: + kwargs['proxies'] = proxies + + log.debug(f'Making request to Segment Object API:\n {data_str}') + + return send_post(url, **kwargs) class APIError(Exception): diff --git a/simulator.py b/simulator.py index df83de67..e50d6060 100644 --- a/simulator.py +++ b/simulator.py @@ -12,6 +12,7 @@ def json_hash(str): if str: return json.loads(str) + # analytics -method= -segment-write-key= [options] @@ -28,7 +29,7 @@ def json_hash(str): parser.add_argument('--event', help='the event name to send with the event') parser.add_argument( - '--properties', help='the event properties to send (JSON-encoded)') + '--properties', help='the event or object properties to send (JSON-encoded)') parser.add_argument( '--name', help='name of the screen or page to send with the message') @@ -38,6 +39,11 @@ def json_hash(str): parser.add_argument('--groupId', help='the group id') +parser.add_argument('--objectId', help='the ID of an Object') + +parser.add_argument( + '--collection', help='the collection an Object belongs to') + options = parser.parse_args() @@ -70,6 +76,11 @@ def group(): json_hash(options.context), anonymous_id=options.anonymousId) +# Used plural name because object is a reserved word in Python +def objects(): + analytics.object(options.objectId, options.collection, json_hash(options.properties), json_hash(options.context)) + + def unknown(): print() @@ -88,7 +99,8 @@ def unknown(): "page": page, "screen": screen, "identify": identify, - "group": group + "group": group, + "object": objects } func = switcher.get(options.type)