Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ the release.
([2214](https://github.com/open-telemetry/opentelemetry-demo/pull/2214))
* [quote] replace debian image with latest alpine image
([2216](https://github.com/open-telemetry/opentelemetry-demo/pull/2216))
* [load-generator] Update locustfile for logging with TraceContext
([2265](https://github.com/open-telemetry/opentelemetry-demo/pull/2265))

## 2.0.2

Expand Down
195 changes: 127 additions & 68 deletions src/load-generator/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor
from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
Expand All @@ -39,34 +40,46 @@

from playwright.async_api import Route, Request

logger_provider = LoggerProvider(resource=Resource.create(
{
"service.name": "load-generator",
}
),)
# Set up resource for consistent identification across telemetry signals
resource = Resource.create({
"service.name": "load-generator",
})

# Configure tracer provider first (needed for trace context in logs)
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(insecure=True)))

# Configure logger provider with the same resource
logger_provider = LoggerProvider(resource=resource)
set_logger_provider(logger_provider)

exporter = OTLPLogExporter(insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
# Set up log exporter and processor
log_exporter = OTLPLogExporter(insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))

# Create logging handler that will include trace context
handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider)

# Attach OTLP handler to locust logger
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)
# Configure root logger
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)

exporter = OTLPMetricExporter(insecure=True)
set_meter_provider(MeterProvider([PeriodicExportingMetricReader(exporter)]))
# Configure metrics
metric_exporter = OTLPMetricExporter(insecure=True)
set_meter_provider(MeterProvider([PeriodicExportingMetricReader(metric_exporter)], resource=resource))

tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
# Instrument logging to automatically inject trace context
LoggingInstrumentor().instrument(set_logging_format=True)

# Instrumenting manually to avoid error with locust gevent monkey
Jinja2Instrumentor().instrument()
RequestsInstrumentor().instrument()
SystemMetricsInstrumentor().instrument()
URLLib3Instrumentor().instrument()
logging.info("Instrumentation complete")

logging.info("Instrumentation complete - logs will now include trace context")

# Initialize Flagd provider
base_url = f"http://{os.environ.get('FLAGD_HOST', 'localhost')}:{os.environ.get('FLAGD_OFREP_PORT', 8016)}"
Expand Down Expand Up @@ -107,76 +120,112 @@ def get_flagd_value(FlagName):
class WebsiteUser(HttpUser):
wait_time = between(1, 10)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tracer = trace.get_tracer(__name__)

@task(1)
def index(self):
self.client.get("/")
with self.tracer.start_as_current_span("user_index"):
logging.info("User accessing index page")
self.client.get("/")

@task(10)
def browse_product(self):
self.client.get("/api/products/" + random.choice(products))
product = random.choice(products)
with self.tracer.start_as_current_span("user_browse_product", attributes={"product.id": product}):
logging.info(f"User browsing product: {product}")
self.client.get("/api/products/" + product)

@task(3)
def get_recommendations(self):
params = {
"productIds": [random.choice(products)],
}
self.client.get("/api/recommendations", params=params)
product = random.choice(products)
with self.tracer.start_as_current_span("user_get_recommendations", attributes={"product.id": product}):
logging.info(f"User getting recommendations for product: {product}")
params = {
"productIds": [product],
}
self.client.get("/api/recommendations", params=params)

@task(3)
def get_ads(self):
params = {
"contextKeys": [random.choice(categories)],
}
self.client.get("/api/data/", params=params)
category = random.choice(categories)
with self.tracer.start_as_current_span("user_get_ads", attributes={"category": str(category)}):
logging.info(f"User getting ads for category: {category}")
params = {
"contextKeys": [category],
}
self.client.get("/api/data/", params=params)

@task(3)
def view_cart(self):
self.client.get("/api/cart")
with self.tracer.start_as_current_span("user_view_cart"):
logging.info("User viewing cart")
self.client.get("/api/cart")

@task(2)
def add_to_cart(self, user=""):
if user == "":
user = str(uuid.uuid1())
product = random.choice(products)
self.client.get("/api/products/" + product)
cart_item = {
"item": {
"productId": product,
"quantity": random.choice([1, 2, 3, 4, 5, 10]),
},
"userId": user,
}
self.client.post("/api/cart", json=cart_item)
quantity = random.choice([1, 2, 3, 4, 5, 10])
with self.tracer.start_as_current_span("user_add_to_cart",
attributes={"user.id": user, "product.id": product, "quantity": quantity}):
logging.info(f"User {user} adding {quantity} of product {product} to cart")
self.client.get("/api/products/" + product)
cart_item = {
"item": {
"productId": product,
"quantity": quantity,
},
"userId": user,
}
self.client.post("/api/cart", json=cart_item)

@task(1)
def checkout(self):
# checkout call with an item added to cart
user = str(uuid.uuid1())
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
with self.tracer.start_as_current_span("user_checkout_single", attributes={"user.id": user}):
logging.info(f"User {user} performing single item checkout")
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
logging.info(f"Checkout completed for user {user}")

@task(1)
def checkout_multi(self):
# checkout call which adds 2-4 different items to cart before checkout
user = str(uuid.uuid1())
for i in range(random.choice([2, 3, 4])):
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
item_count = random.choice([2, 3, 4])
with self.tracer.start_as_current_span("user_checkout_multi",
attributes={"user.id": user, "item.count": item_count}):
logging.info(f"User {user} performing multi-item checkout with {item_count} items")
for i in range(item_count):
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
logging.info(f"Multi-item checkout completed for user {user}")

@task(5)
def flood_home(self):
for _ in range(0, get_flagd_value("loadGeneratorFloodHomepage")):
self.client.get("/")
flood_count = get_flagd_value("loadGeneratorFloodHomepage")
if flood_count > 0:
with self.tracer.start_as_current_span("user_flood_home", attributes={"flood.count": flood_count}):
logging.info(f"User flooding homepage {flood_count} times")
for _ in range(0, flood_count):
self.client.get("/")

def on_start(self):
ctx = baggage.set_baggage("session.id", str(uuid.uuid4()))
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx)
context.attach(ctx)
self.index()
with self.tracer.start_as_current_span("user_session_start"):
session_id = str(uuid.uuid4())
logging.info(f"Starting user session: {session_id}")
ctx = baggage.set_baggage("session.id", session_id)
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx)
context.attach(ctx)
self.index()


browser_traffic_enabled = os.environ.get("LOCUST_BROWSER_TRAFFIC_ENABLED", "").lower() in ("true", "yes", "on")
Expand All @@ -185,30 +234,40 @@ def on_start(self):
class WebsiteBrowserUser(PlaywrightUser):
headless = True # to use a headless browser, without a GUI

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tracer = trace.get_tracer(__name__)

@task
@pw
async def open_cart_page_and_change_currency(self, page: PageWithRetry):
try:
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/cart", wait_until="domcontentloaded")
await page.select_option('[name="currency_code"]', 'CHF')
await page.wait_for_timeout(2000) # giving the browser time to export the traces
except:
pass
with self.tracer.start_as_current_span("browser_change_currency"):
try:
logging.info("Browser user opening cart page and changing currency")
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/cart", wait_until="domcontentloaded")
await page.select_option('[name="currency_code"]', 'CHF')
await page.wait_for_timeout(2000) # giving the browser time to export the traces
logging.info("Currency changed to CHF")
except Exception as e:
logging.error(f"Error in change currency task: {str(e)}")

@task
@pw
async def add_product_to_cart(self, page: PageWithRetry):
try:
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/", wait_until="domcontentloaded")
await page.click('p:has-text("Roof Binoculars")', wait_until="domcontentloaded")
await page.click('button:has-text("Add To Cart")', wait_until="domcontentloaded")
await page.wait_for_timeout(2000) # giving the browser time to export the traces
except:
pass
with self.tracer.start_as_current_span("browser_add_to_cart"):
try:
logging.info("Browser user adding Roof Binoculars to cart")
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/", wait_until="domcontentloaded")
await page.click('p:has-text("Roof Binoculars")', wait_until="domcontentloaded")
await page.click('button:has-text("Add To Cart")', wait_until="domcontentloaded")
await page.wait_for_timeout(2000) # giving the browser time to export the traces
logging.info("Product added to cart successfully")
except Exception as e:
logging.error(f"Error in add to cart task: {str(e)}")


async def add_baggage_header(route: Route, request: Request):
Expand Down
1 change: 1 addition & 0 deletions src/load-generator/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ opentelemetry-instrumentation-system-metrics==0.55b1
opentelemetry-instrumentation-urllib3==0.55b1
opentelemetry-sdk==1.34.1
opentelemetry-semantic-conventions==0.55b1
opentelemetry-instrumentation-logging==0.55b1