A Python 3 library for Dow Jones customers to consume data from a Factiva Analytics Stream.
If you don't wish to work with any of the demo snippets provided by this repository, you can proceed with a clean installation of the package by running either of the following commands.
# Install the last published version
pip3 install dnaStreaming
# Install a specific published version
pip3 install dnaStreaming==<VERSION>
# Install from a specific Github branch
pip3 install git+https://github.com/dowjones/dj-dna-streams-python.git@<BRANCH_NAME>Alternatively, you can add add it as a dependency inside your requirements.txt file
...
# Include ONE of the following lines:
dnaStreaming
dnaStreaming==<VERSION>
git+https://github.com/dowjones/dj-dna-streams-python.git@<BRANCH_NAME>
...Or inside your pyproject.toml file
[project]
...
dependencies = [
# Include only ONE of the following lines:
"dnaStreaming",
"dnaStreaming==<VERSION>",
"git+https://github.com/dowjones/dj-dna-streams-python.git@<BRANCH_NAME>",
]On the other hand, if you wish to work with one of the code samples provided in this repository, or you want to work with a customized version of the listener by you, you can perform an installation from source code once you have cloned the repository.
cd dj-dna-streams-python
git switch <BRANCH> # optional if you wish to work on a specific branch
python3 install -m venv env
source env/bin/activate
pip3 install -e . # editable install, recommendedOnce you're done installing from source code, make sure you set your credentials as environment variables as described by the Authentication and Configuration sections.
Then, you can run the demo file that you prefer:
python3 demo/show_stream.py
# or
python3 demo/show_stream_async.pyYou have two options to authenticate:
- By using your user key
- By using an OAuth bearer token (new)
To run this code, you need to provide credentials from one of the authentication methods, as well as your subscriptions. There are three ways you can do this:
To set your credentials, set either USER_KEY or OAUTH_TOKEN as an environment variable:
export USER_KEY="<your_user_key>"
# or
export OAUTH_TOKEN="<your_oauth_bearer_token>"To set your subscription ID, set an environment variable named SUBSCRIPTION_ID:
export SUBSCRIPTION_ID="ABC1234567889"To set your log folder path, set a directory variable named LOG_PATH:
export LOG_PATH="/your/custom/log/path"In this codebase, you will find a file named customer_config.json. You are not required to use this file, but if you prefer to, fill the JSON object within by adding your user key and your subscription ID. Follow basic JSON formatting and syntax conventions.
Note: The listener will search for the
customer_config.jsonfile inside your$HOMEdirectory by default.
If you prefer using an explicit path to your configuration file, pass the absolute path to the Listener constructor:
from dnaStreaming.listener import Listener
# Config file authentication
listener = Listener(config_file="<ABSOLUTE_PATH_TO_YOUR_CONFIG_FILE>")You may pass your user key or OAuth token to the Listener constructor and your subscription ID directly to the listen method:
from dnaStreaming.listener import Listener
# Use the user_key argument to provide your credentials
listener = Listener(user_key="<YOUR_USER_KEY>")
# Alternatively, use the oauth_token argument
listener = Listener(oauth_token="<YOUR_OAUTH_BEARER_TOKEN>")
# Use the subscription_id argument to provide your subscription id to the listener
listener.listen(callback, subscription_id="<YOUR_SUBSCRIPTION_ID>")
# The same parameter applies for the async variation
listener.listen_async(callback, subscription_id="<YOUR_SUBSCRIPTION_ID>")Note: Passing credentials and subscription ID(s) via function arguments will override both environment variables and config file settings.
If you choose to strictly rely on environment variables, you can simply initialize the listener empty:
from dnaStreaming.listener import Listener
listener = Listener()If you want to listen to messages synchronously:
def callback(message, subscription_id):
print(f'Subscription ID: {subscription_id}: Message: {message.data}')
return True # Return False to stop the message flow and unblock the process.
# Omitting maximum_messages means you will continue to get messages as they appear.
# This can be a firehose. Use with caution.
listener.listen(callback, maximum_messages=10) If you want to listen to messages asynchronously:
from time import sleep
def callback(message, subscription_id):
print(f'Subscription ID: {subscription_id}: Message: {message.data}')
future = listener.listen_async(callback)
# After calling `listen_async`, you need to keep the main thread alive.
for count in range(0, 5):
sleep(1)
# Stop receiving messages after 5 seconds
if future.running():
future.cancel()Minimal logging is written to a file named dj-dna-streaming-python.log.
By default, logs are written to the first available directory from the following list:
- A custom path set via the environment variable
LOG_PATH. - A
logs/folder located within the package installation directory. - A fallback directory:
~/.dj-dna-streaming-python/logs/.
The first writable location found is selected. A message like Will log to: /your/custom/log/path is printed to the console on startup.
💡 Note: The log file is overwritten each time the application starts to keep maintenance simple.
You can specify:
- Absolute paths: For example,
/var/log/dna-streaming. - Relative paths: For example,
./logs, relative to the current working directory at runtime.
The code verifies that the specified path is writable. If it isn’t, it automatically falls back to the next available option.
To prepare for local development and debugging, create a virtual environment and install both runtime and dev dependencies (make sure to run with a recent version of Python 3):
python3 install -m venv env
source env/bin/activate
# Install runtime dependencies from pyproject.toml (-e for recommended editable install)
pip3 install -e .
# Install dev dependencies from pyproject.toml
pip3 install -e ".[dev]"To test the library works over multiple Python versions (e.g., Python >= 3.10), use tox:
# Make sure to set USER_KEY (or OAUTH_TOKEN), API_HOST and SUBSCRIPTION_ID accordingly
# Strong recommendation:
# - SUBSCRIPTION_ID should belong to an active stream with about 2000 queued messages
# for faster test execution times.
toxBefore releasing, or ideally, before you commit, make sure to ruff your code
to align with minimal but recommended linting. At the project root, run:
ruff check . # checks code without changing it
ruff check --fix . # checks code and applies changes
ruff format . # formats code