From b371824c6e83c053f5bed975ae0d577405bd5fc7 Mon Sep 17 00:00:00 2001 From: Logan-Ruf Date: Sat, 30 Sep 2023 15:35:05 -0700 Subject: [PATCH 1/6] Added Object API data to client and simulator --- segment/analytics/__init__.py | 4 ++++ segment/analytics/client.py | 38 ++++++++++++++++++++++++++++++++++- segment/analytics/consumer.py | 5 +++-- segment/analytics/request.py | 4 ++-- simulator.py | 16 +++++++++++++-- 5 files changed, 60 insertions(+), 7 deletions(-) diff --git a/segment/analytics/__init__.py b/segment/analytics/__init__.py index 230769b5..8b923450 100644 --- a/segment/analytics/__init__.py +++ b/segment/analytics/__init__.py @@ -48,6 +48,10 @@ def screen(*args, **kwargs): """Send a screen call.""" return _proxy('screen', *args, **kwargs) +def object(*args, **kwargs): + """Send a object call.""" + return _proxy('object', *args, **kwargs) + def flush(): """Tell the client to flush.""" diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 515da899..a6db4920 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -21,11 +21,14 @@ class Client(object): class DefaultConfig(object): write_key = None host = None + objects_host = 'https://objects.segment.com' + objects_endpoint = '/v1/set' on_error = None debug = False send = True sync_mode = False max_queue_size = 10000 + max_object_queue_size = 10 gzip = False timeout = 15 max_retries = 10 @@ -40,8 +43,11 @@ class DefaultConfig(object): def __init__(self, write_key=DefaultConfig.write_key, host=DefaultConfig.host, + objects_host=DefaultConfig.objects_host, + objects_endpoint=DefaultConfig.objects_endpoint, debug=DefaultConfig.debug, max_queue_size=DefaultConfig.max_queue_size, + max_object_queue_size=DefaultConfig.max_object_queue_size, send=DefaultConfig.send, on_error=DefaultConfig.on_error, gzip=DefaultConfig.gzip, @@ -55,12 +61,15 @@ def __init__(self, require('write_key', write_key, str) self.queue = queue.Queue(max_queue_size) + self.object_queue = queue.Queue(max_object_queue_size) self.write_key = write_key self.on_error = on_error self.debug = debug self.send = send self.sync_mode = sync_mode self.host = host + self.objects_host = objects_host + self.objects_endpoint = objects_endpoint self.gzip = gzip self.timeout = timeout self.proxies = proxies @@ -88,6 +97,14 @@ def __init__(self, proxies=proxies, ) self.consumers.append(consumer) + object_consumer = Consumer( + self.object_queue, write_key, host=objects_host, + endpoint=objects_endpoint, on_error=on_error, + upload_size=upload_size, upload_interval=upload_interval, + gzip=gzip, retries=max_retries, timeout=timeout, + proxies=proxies, + ) + self.consumers.append(object_consumer) # if we've disabled sending, just don't start the consumer if send: @@ -239,6 +256,21 @@ def screen(self, user_id=None, category=None, name=None, properties=None, return self._enqueue(msg) + def object(self, object_id=None, collection=None, properties=None): + properties = properties or {} + require('object_id', object_id, ID_TYPES) + require('collection', collection, str) + require('properties', properties, dict) + + msg = { + 'objectId': object_id, + 'collection': collection, + 'properties': properties, + 'type': 'object' + } + + return self._enqueue(msg) + def _enqueue(self, msg): """Push a new `msg` onto the queue, return `(success, msg)`""" timestamp = msg['timestamp'] @@ -284,8 +316,12 @@ def _enqueue(self, msg): return True, msg + if msg.get('type') == 'object': + current_queue = self.object_queue + else: + current_queue = self.queue try: - self.queue.put(msg, block=False) + current_queue.put(msg, block=False) self.log.debug('enqueued %s.', msg['type']) return True, msg except queue.Full: diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 27586284..7e7625c6 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -19,7 +19,7 @@ class Consumer(Thread): """Consumes the messages from the client's queue.""" log = logging.getLogger('segment') - def __init__(self, queue, write_key, upload_size=100, host=None, + def __init__(self, queue, write_key, upload_size=100, host=None, endpoint=None, on_error=None, upload_interval=0.5, gzip=False, retries=10, timeout=15, proxies=None): """Create a consumer thread.""" @@ -30,6 +30,7 @@ def __init__(self, queue, write_key, upload_size=100, host=None, self.upload_interval = upload_interval self.write_key = write_key self.host = host + self.endpoint = endpoint self.on_error = on_error self.queue = queue self.gzip = gzip @@ -128,7 +129,7 @@ def fatal_exception(exc): max_tries=self.retries + 1, giveup=fatal_exception) def send_request(): - post(self.write_key, self.host, gzip=self.gzip, + post(self.write_key, self.host, endpoint=self.endpoint, gzip=self.gzip, timeout=self.timeout, batch=batch, proxies=self.proxies) send_request() diff --git a/segment/analytics/request.py b/segment/analytics/request.py index d1901f79..a7855c1d 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -13,12 +13,12 @@ _session = sessions.Session() -def post(write_key, host=None, gzip=False, timeout=15, proxies=None, **kwargs): +def post(write_key, host=None, endpoint=None, gzip=False, timeout=15, proxies=None, **kwargs): """Post the `kwargs` to the API""" log = logging.getLogger('segment') body = kwargs body["sentAt"] = datetime.utcnow().replace(tzinfo=tzutc()).isoformat() - url = remove_trailing_slash(host or 'https://api.segment.io') + '/v1/batch' + url = remove_trailing_slash(host or 'https://api.segment.io') + (endpoint or '/v1/batch') auth = HTTPBasicAuth(write_key, '') data = json.dumps(body, cls=DatetimeSerializer) log.debug('making request: %s', data) diff --git a/simulator.py b/simulator.py index df83de67..e50d6060 100644 --- a/simulator.py +++ b/simulator.py @@ -12,6 +12,7 @@ def json_hash(str): if str: return json.loads(str) + # analytics -method= -segment-write-key= [options] @@ -28,7 +29,7 @@ def json_hash(str): parser.add_argument('--event', help='the event name to send with the event') parser.add_argument( - '--properties', help='the event properties to send (JSON-encoded)') + '--properties', help='the event or object properties to send (JSON-encoded)') parser.add_argument( '--name', help='name of the screen or page to send with the message') @@ -38,6 +39,11 @@ def json_hash(str): parser.add_argument('--groupId', help='the group id') +parser.add_argument('--objectId', help='the ID of an Object') + +parser.add_argument( + '--collection', help='the collection an Object belongs to') + options = parser.parse_args() @@ -70,6 +76,11 @@ def group(): json_hash(options.context), anonymous_id=options.anonymousId) +# Used plural name because object is a reserved word in Python +def objects(): + analytics.object(options.objectId, options.collection, json_hash(options.properties), json_hash(options.context)) + + def unknown(): print() @@ -88,7 +99,8 @@ def unknown(): "page": page, "screen": screen, "identify": identify, - "group": group + "group": group, + "object": objects } func = switcher.get(options.type) From cc72b319a04913de2208427e69330aabe9d37831 Mon Sep 17 00:00:00 2001 From: Logan-Ruf Date: Mon, 9 Oct 2023 13:39:09 -0700 Subject: [PATCH 2/6] Fix yml formatting --- .github/workflows/tests.yml | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7d1c621d..20c19233 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,37 +23,32 @@ jobs: with: python-version: 3.7 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.8 uses: ./ with: python-version: 3.8 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.9 uses: ./ with: python-version: 3.9 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.10 uses: ./ with: python-version: 3.10 - name: Run tests - run: make test - run: make e2e_test + run: make test && make e2e_test - name: Run with setup-python 3.11 uses: ./ with: python-version: 3.11 - name: Run tests - run: make test - run: make e2e_test \ No newline at end of file + run: make test && make e2e_test From 8615cd415dd8b077192edf60fda85f5793ce91d2 Mon Sep 17 00:00:00 2001 From: Logan-Ruf Date: Mon, 9 Oct 2023 13:51:19 -0700 Subject: [PATCH 3/6] Refactor Object data into main queue --- segment/analytics/client.py | 66 +++++++++++--------------- segment/analytics/consumer.py | 34 ++++++++++++-- segment/analytics/request.py | 87 ++++++++++++++++++++++++++++++----- 3 files changed, 131 insertions(+), 56 deletions(-) diff --git a/segment/analytics/client.py b/segment/analytics/client.py index a6db4920..3f0687cd 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -97,14 +97,6 @@ def __init__(self, proxies=proxies, ) self.consumers.append(consumer) - object_consumer = Consumer( - self.object_queue, write_key, host=objects_host, - endpoint=objects_endpoint, on_error=on_error, - upload_size=upload_size, upload_interval=upload_interval, - gzip=gzip, retries=max_retries, timeout=timeout, - proxies=proxies, - ) - self.consumers.append(object_consumer) # if we've disabled sending, just don't start the consumer if send: @@ -273,29 +265,30 @@ def object(self, object_id=None, collection=None, properties=None): def _enqueue(self, msg): """Push a new `msg` onto the queue, return `(success, msg)`""" - timestamp = msg['timestamp'] - if timestamp is None: - timestamp = datetime.utcnow().replace(tzinfo=tzutc()) - message_id = msg.get('messageId') - if message_id is None: - message_id = uuid4() - - require('integrations', msg['integrations'], dict) - require('type', msg['type'], str) - require('timestamp', timestamp, datetime) - require('context', msg['context'], dict) - - # add common - timestamp = guess_timezone(timestamp) - msg['timestamp'] = timestamp.isoformat(timespec='milliseconds') - msg['messageId'] = stringify_id(message_id) - msg['context']['library'] = { - 'name': 'analytics-python', - 'version': VERSION - } - - msg['userId'] = stringify_id(msg.get('userId', None)) - msg['anonymousId'] = stringify_id(msg.get('anonymousId', None)) + if msg['type'] != 'object': + timestamp = msg['timestamp'] + if timestamp is None: + timestamp = datetime.utcnow().replace(tzinfo=tzutc()) + message_id = msg.get('messageId') + if message_id is None: + message_id = uuid4() + + require('integrations', msg['integrations'], dict) + require('type', msg['type'], str) + require('timestamp', timestamp, datetime) + require('context', msg['context'], dict) + + # add common + timestamp = guess_timezone(timestamp) + msg['timestamp'] = timestamp.isoformat(timespec='milliseconds') + msg['messageId'] = stringify_id(message_id) + msg['context']['library'] = { + 'name': 'analytics-python', + 'version': VERSION + } + + msg['userId'] = stringify_id(msg.get('userId', None)) + msg['anonymousId'] = stringify_id(msg.get('anonymousId', None)) msg = clean(msg) self.log.debug('queueing: %s', msg) @@ -316,12 +309,8 @@ def _enqueue(self, msg): return True, msg - if msg.get('type') == 'object': - current_queue = self.object_queue - else: - current_queue = self.queue try: - current_queue.put(msg, block=False) + self.queue.put(msg, block=False) self.log.debug('enqueued %s.', msg['type']) return True, msg except queue.Full: @@ -330,9 +319,8 @@ def _enqueue(self, msg): def flush(self): """Forces a flush from the internal queue to the server""" - queue = self.queue - size = queue.qsize() - queue.join() + size = self.queue.qsize() + self.queue.join() # Note that this message may not be precise, because of threading. self.log.debug('successfully flushed about %s items.', size) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 7e7625c6..a1247473 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -4,7 +4,7 @@ import backoff import json -from segment.analytics.request import post, APIError, DatetimeSerializer +from segment.analytics.request import post, APIError, DatetimeSerializer, start_object_session, post_object from queue import Empty @@ -42,6 +42,8 @@ def __init__(self, queue, write_key, upload_size=100, host=None, endpoint=None, self.retries = retries self.timeout = timeout self.proxies = proxies + # Object API + self.sessionId = None def run(self): """Runs the consumer.""" @@ -80,6 +82,7 @@ def next(self): """Return the next batch of items to upload.""" queue = self.queue items = [] + objects = [] start_time = monotonic.monotonic() total_size = 0 @@ -97,7 +100,11 @@ def next(self): self.log.error( 'Item exceeds 32kb limit, dropping. (%s)', str(item)) continue - items.append(item) + if getattr(item, 'type', None) == 'object': + objects.append(item) + else: + items.append(item) + # TODO: total_size needs to account for both APIs total_size += item_size if total_size >= BATCH_SIZE_LIMIT: self.log.debug( @@ -110,9 +117,27 @@ def next(self): return items + def request_object(self, batch): + """Upload object data to the object API """ + + # TODO add session keep alive logic + if self.sessionId is None: + self.sessionId = self._request(start_object_session, self.write_key, timeout=self.timeout, + proxies=self.proxies) + + url = f'https://objects-bulk-api.segmentapis.com/v0/upload/{self.sessionId}' + + self._request(post_object, url, batch, write_key=self.write_key, timeout=self.timeout, proxies=self.proxies) + def request(self, batch): """Attempt to upload the batch and retry before raising an error """ + self._request(post, self.write_key, self.host, endpoint=self.endpoint, gzip=self.gzip, + timeout=self.timeout, batch=batch, proxies=self.proxies) + + def _request(self, request_func, *args, **kwargs): + """Request wrapper that will retry before raising an error """ + def fatal_exception(exc): if isinstance(exc, APIError): # retry on server errors and client errors @@ -129,7 +154,6 @@ def fatal_exception(exc): max_tries=self.retries + 1, giveup=fatal_exception) def send_request(): - post(self.write_key, self.host, endpoint=self.endpoint, gzip=self.gzip, - timeout=self.timeout, batch=batch, proxies=self.proxies) + return request_func(*args, **kwargs) - send_request() + return send_request() diff --git a/segment/analytics/request.py b/segment/analytics/request.py index a7855c1d..3dfa2391 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -13,6 +13,22 @@ _session = sessions.Session() +def send_post(*args, **kwargs): + log = logging.getLogger('segment') + res = _session.post(*args, **kwargs) + + if res.status_code == 200: + log.debug('data uploaded successfully') + return res + + try: + payload = res.json() + log.debug('received response: %s', payload) + raise APIError(res.status_code, payload['code'], payload['message']) + except ValueError: + raise APIError(res.status_code, 'unknown', res.text) + + def post(write_key, host=None, endpoint=None, gzip=False, timeout=15, proxies=None, **kwargs): """Post the `kwargs` to the API""" log = logging.getLogger('segment') @@ -39,25 +55,72 @@ def post(write_key, host=None, endpoint=None, gzip=False, timeout=15, proxies=No "data": data, "auth": auth, "headers": headers, - "timeout": 15, + "timeout": timeout, } if proxies: kwargs['proxies'] = proxies - res = _session.post(url, data=data, auth=auth, - headers=headers, timeout=timeout) + return send_post(url, **kwargs) - if res.status_code == 200: - log.debug('data uploaded successfully') - return res - try: - payload = res.json() - log.debug('received response: %s', payload) - raise APIError(res.status_code, payload['code'], payload['message']) - except ValueError: - raise APIError(res.status_code, 'unknown', res.text) +def start_object_session(write_key, timeout=15, proxies=None): + """Start a new session with Segment's Object API.""" + log = logging.getLogger('segment') + auth = HTTPBasicAuth(write_key, '') + headers = { + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-python/' + VERSION + } + + kwargs = { + "auth": auth, + "headers": headers, + "timeout": timeout, + } + + if proxies: + kwargs['proxies'] = proxies + + res = send_post('https://objects-bulk-api.segmentapis.com/v0/start', auth=auth, + headers=headers, timeout=timeout) + data = res.json() + sync_id = data['sync_id'] + log.debug(f'object session start: {sync_id = }') + return data['sync_id'] + + +def post_object(url, batch, write_key=None, timeout=15, proxies=None): + log = logging.getLogger('segment') + auth = HTTPBasicAuth(write_key, '') + + data_str = "\n".join(json.dumps(obj) for obj in batch) + buf = BytesIO() + with GzipFile(fileobj=buf, mode='w') as gz: + # 'data' was produced by json.dumps(), + # whose default encoding is utf-8. + gz.write(data_str.encode('utf-8')) + data = buf.getvalue() + + headers = { + 'Content-Encoding': 'gzip', + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-python/' + VERSION + } + + kwargs = { + "auth": auth, + "headers": headers, + "timeout": timeout, + "data": data, + } + + if proxies: + kwargs['proxies'] = proxies + + log.debug(f'Making request to Segment Object API:\n {data_str}') + + return send_post(url, **kwargs) class APIError(Exception): From 438fe7f1b994842e694bbb8c24fb4a478acde0a7 Mon Sep 17 00:00:00 2001 From: Logan-Ruf Date: Tue, 10 Oct 2023 13:12:02 -0700 Subject: [PATCH 4/6] Added Keep Alive logic and bug fixes for object API --- segment/analytics/__init__.py | 1 + segment/analytics/client.py | 14 ++++---- segment/analytics/consumer.py | 65 +++++++++++++++++++++++++++-------- segment/analytics/request.py | 31 +++++++++++++---- 4 files changed, 83 insertions(+), 28 deletions(-) diff --git a/segment/analytics/__init__.py b/segment/analytics/__init__.py index 8b923450..db25221d 100644 --- a/segment/analytics/__init__.py +++ b/segment/analytics/__init__.py @@ -15,6 +15,7 @@ gzip = Client.DefaultConfig.gzip timeout = Client.DefaultConfig.timeout max_retries = Client.DefaultConfig.max_retries +keep_alive = Client.DefaultConfig.keep_alive default_client = None diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 3f0687cd..d8dfdb91 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -21,8 +21,7 @@ class Client(object): class DefaultConfig(object): write_key = None host = None - objects_host = 'https://objects.segment.com' - objects_endpoint = '/v1/set' + endpoint = None on_error = None debug = False send = True @@ -36,6 +35,7 @@ class DefaultConfig(object): thread = 1 upload_interval = 0.5 upload_size = 100 + keep_alive = True """Create a new Segment client.""" log = logging.getLogger('segment') @@ -43,8 +43,6 @@ class DefaultConfig(object): def __init__(self, write_key=DefaultConfig.write_key, host=DefaultConfig.host, - objects_host=DefaultConfig.objects_host, - objects_endpoint=DefaultConfig.objects_endpoint, debug=DefaultConfig.debug, max_queue_size=DefaultConfig.max_queue_size, max_object_queue_size=DefaultConfig.max_object_queue_size, @@ -57,7 +55,8 @@ def __init__(self, proxies=DefaultConfig.proxies, thread=DefaultConfig.thread, upload_size=DefaultConfig.upload_size, - upload_interval=DefaultConfig.upload_interval,): + upload_interval=DefaultConfig.upload_interval, + keep_alive=DefaultConfig.keep_alive): require('write_key', write_key, str) self.queue = queue.Queue(max_queue_size) @@ -68,11 +67,10 @@ def __init__(self, self.send = send self.sync_mode = sync_mode self.host = host - self.objects_host = objects_host - self.objects_endpoint = objects_endpoint self.gzip = gzip self.timeout = timeout self.proxies = proxies + self.keep_alive = keep_alive if debug: self.log.setLevel(logging.DEBUG) @@ -94,7 +92,7 @@ def __init__(self, self.queue, write_key, host=host, on_error=on_error, upload_size=upload_size, upload_interval=upload_interval, gzip=gzip, retries=max_retries, timeout=timeout, - proxies=proxies, + proxies=proxies, keep_alive=keep_alive, ) self.consumers.append(consumer) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index a1247473..a11f4682 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -4,10 +4,13 @@ import backoff import json -from segment.analytics.request import post, APIError, DatetimeSerializer, start_object_session, post_object +from segment.analytics.request import post, APIError, DatetimeSerializer, start_object_session, post_object, \ + keep_alive_object_session from queue import Empty +from segment.analytics.utils import remove_trailing_slash + MAX_MSG_SIZE = 32 << 10 # Our servers only accept batches less than 500KB. Here limit is set slightly @@ -21,7 +24,7 @@ class Consumer(Thread): def __init__(self, queue, write_key, upload_size=100, host=None, endpoint=None, on_error=None, upload_interval=0.5, gzip=False, retries=10, - timeout=15, proxies=None): + timeout=15, proxies=None, keep_alive=True): """Create a consumer thread.""" Thread.__init__(self) # Make consumer a daemon thread so that it doesn't block program exit @@ -29,8 +32,8 @@ def __init__(self, queue, write_key, upload_size=100, host=None, endpoint=None, self.upload_size = upload_size self.upload_interval = upload_interval self.write_key = write_key - self.host = host - self.endpoint = endpoint + self.host = host or 'https://api.segment.io' + self.endpoint = endpoint or '/v1/batch' self.on_error = on_error self.queue = queue self.gzip = gzip @@ -43,13 +46,21 @@ def __init__(self, queue, write_key, upload_size=100, host=None, endpoint=None, self.timeout = timeout self.proxies = proxies # Object API - self.sessionId = None + self.object_host = 'https://objects-bulk-api.segmentapis.com' + self.object_start_endpoint = '/v0/start' + self.object_upload_endpoint = '/v0/upload/{sync_id}' + self.object_keep_alive_endpoint = '/v0/keep-alive/{sync_id}' + self.session_id = None + self.keep_alive = keep_alive + self.keep_alive_time = None + self.keep_alive_interval = 9.5 * 60 # 9.5 minutes in seconds def run(self): """Runs the consumer.""" self.log.debug('consumer is running...') while self.running: self.upload() + self.check_keep_alive() self.log.debug('consumer exited.') @@ -60,12 +71,13 @@ def pause(self): def upload(self): """Upload the next batch of items, return whether successful.""" success = False - batch = self.next() - if len(batch) == 0: + batch, objects = self.next() + if len(batch) == 0 and len(objects) == 0: return False try: self.request(batch) + self.request_object(objects) success = True except Exception as e: self.log.error('error uploading: %s', e) @@ -100,7 +112,7 @@ def next(self): self.log.error( 'Item exceeds 32kb limit, dropping. (%s)', str(item)) continue - if getattr(item, 'type', None) == 'object': + if item['type'] == 'object': objects.append(item) else: items.append(item) @@ -115,22 +127,47 @@ def next(self): except Exception as e: self.log.exception('Exception: %s', e) - return items + return items, objects + + def check_keep_alive(self): + """Check if the object session needs to be kept alive """ + if not self.keep_alive or self.keep_alive_time is None or self.session_id is None: + return + now = monotonic.monotonic() + elapsed = now - self.keep_alive_time + if elapsed >= self.keep_alive_interval: + self.log.debug('keep alive interval passed') + self.request_keep_alive() + + def request_keep_alive(self): + """Keep alive the object session """ + if self.session_id is None: + self.log.error('No object session id found, cannot keep alive session') + return + url = remove_trailing_slash(self.object_host) + self.object_keep_alive_endpoint.format(sync_id=self.session_id) + self._request(keep_alive_object_session, url, self.write_key, self.session_id, + timeout=self.timeout, proxies=self.proxies) + self.keep_alive_time = monotonic.monotonic() def request_object(self, batch): """Upload object data to the object API """ + if len(batch) == 0: + return # TODO add session keep alive logic - if self.sessionId is None: - self.sessionId = self._request(start_object_session, self.write_key, timeout=self.timeout, - proxies=self.proxies) - - url = f'https://objects-bulk-api.segmentapis.com/v0/upload/{self.sessionId}' + if self.session_id is None: + url = remove_trailing_slash(self.object_host) + self.object_start_endpoint + self.session_id = self._request(start_object_session, url, self.write_key, + timeout=self.timeout, proxies=self.proxies) + url = remove_trailing_slash(self.object_host) + self.object_upload_endpoint.format(sync_id=self.session_id) self._request(post_object, url, batch, write_key=self.write_key, timeout=self.timeout, proxies=self.proxies) + self.keep_alive_time = monotonic.monotonic() def request(self, batch): """Attempt to upload the batch and retry before raising an error """ + if len(batch) == 0: + return self._request(post, self.write_key, self.host, endpoint=self.endpoint, gzip=self.gzip, timeout=self.timeout, batch=batch, proxies=self.proxies) diff --git a/segment/analytics/request.py b/segment/analytics/request.py index 3dfa2391..a6981544 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -17,8 +17,8 @@ def send_post(*args, **kwargs): log = logging.getLogger('segment') res = _session.post(*args, **kwargs) - if res.status_code == 200: - log.debug('data uploaded successfully') + if res.status_code in [200, 201]: + log.debug('request sent successfully') return res try: @@ -34,7 +34,7 @@ def post(write_key, host=None, endpoint=None, gzip=False, timeout=15, proxies=No log = logging.getLogger('segment') body = kwargs body["sentAt"] = datetime.utcnow().replace(tzinfo=tzutc()).isoformat() - url = remove_trailing_slash(host or 'https://api.segment.io') + (endpoint or '/v1/batch') + url = remove_trailing_slash(host) + endpoint auth = HTTPBasicAuth(write_key, '') data = json.dumps(body, cls=DatetimeSerializer) log.debug('making request: %s', data) @@ -64,7 +64,7 @@ def post(write_key, host=None, endpoint=None, gzip=False, timeout=15, proxies=No return send_post(url, **kwargs) -def start_object_session(write_key, timeout=15, proxies=None): +def start_object_session(url, write_key, timeout=15, proxies=None): """Start a new session with Segment's Object API.""" log = logging.getLogger('segment') auth = HTTPBasicAuth(write_key, '') @@ -82,13 +82,32 @@ def start_object_session(write_key, timeout=15, proxies=None): if proxies: kwargs['proxies'] = proxies - res = send_post('https://objects-bulk-api.segmentapis.com/v0/start', auth=auth, - headers=headers, timeout=timeout) + res = send_post(url, auth=auth, headers=headers, timeout=timeout) data = res.json() sync_id = data['sync_id'] log.debug(f'object session start: {sync_id = }') return data['sync_id'] +def keep_alive_object_session(url, write_key, sync_id, timeout=15, proxies=None): + log = logging.getLogger('segment') + auth = HTTPBasicAuth(write_key, '') + headers = { + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-python/' + VERSION + } + + kwargs = { + "auth": auth, + "headers": headers, + "timeout": timeout, + } + + if proxies: + kwargs['proxies'] = proxies + + send_post(url, auth=auth, headers=headers, timeout=timeout) + log.debug(f'object session kept alive: {sync_id = }') + def post_object(url, batch, write_key=None, timeout=15, proxies=None): log = logging.getLogger('segment') From 6371d12955ab86c3d6ac41be667e16793b395607 Mon Sep 17 00:00:00 2001 From: Logan-Ruf Date: Mon, 23 Oct 2023 15:59:54 -0700 Subject: [PATCH 5/6] Fixing Queue task_done issues --- segment/analytics/client.py | 2 -- segment/analytics/consumer.py | 7 +++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/segment/analytics/client.py b/segment/analytics/client.py index d8dfdb91..148decd5 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -45,7 +45,6 @@ def __init__(self, host=DefaultConfig.host, debug=DefaultConfig.debug, max_queue_size=DefaultConfig.max_queue_size, - max_object_queue_size=DefaultConfig.max_object_queue_size, send=DefaultConfig.send, on_error=DefaultConfig.on_error, gzip=DefaultConfig.gzip, @@ -60,7 +59,6 @@ def __init__(self, require('write_key', write_key, str) self.queue = queue.Queue(max_queue_size) - self.object_queue = queue.Queue(max_object_queue_size) self.write_key = write_key self.on_error = on_error self.debug = debug diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index a11f4682..bdcd9ad5 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -72,7 +72,8 @@ def upload(self): """Upload the next batch of items, return whether successful.""" success = False batch, objects = self.next() - if len(batch) == 0 and len(objects) == 0: + total_size = len(batch) + len(objects) + if total_size == 0: return False try: @@ -86,7 +87,7 @@ def upload(self): self.on_error(e, batch) finally: # mark items as acknowledged from queue - for _ in batch: + for _ in range(total_size): self.queue.task_done() return success @@ -111,6 +112,7 @@ def next(self): if item_size > MAX_MSG_SIZE: self.log.error( 'Item exceeds 32kb limit, dropping. (%s)', str(item)) + queue.task_done() # Call task_done() for the dropped item continue if item['type'] == 'object': objects.append(item) @@ -125,6 +127,7 @@ def next(self): except Empty: break except Exception as e: + queue.task_done() # Call task_done() for items that raise exceptions self.log.exception('Exception: %s', e) return items, objects From 4ba6e9af44646a17389fc6d7d6cdfeb4448d2ea0 Mon Sep 17 00:00:00 2001 From: Logan-Ruf Date: Mon, 30 Oct 2023 13:51:11 -0700 Subject: [PATCH 6/6] Object API objectId -> id --- segment/analytics/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 148decd5..ac0f028a 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -251,7 +251,7 @@ def object(self, object_id=None, collection=None, properties=None): require('properties', properties, dict) msg = { - 'objectId': object_id, + 'id': object_id, 'collection': collection, 'properties': properties, 'type': 'object'