Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ the release.
([2214](https://github.com/open-telemetry/opentelemetry-demo/pull/2214))
* [quote] replace debian image with latest alpine image
([2216](https://github.com/open-telemetry/opentelemetry-demo/pull/2216))
* [load-generator] Update locustfile for logging with TraceContext
([2265](https://github.com/open-telemetry/opentelemetry-demo/pull/2265))

## 2.0.2

Expand Down
186 changes: 118 additions & 68 deletions src/load-generator/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor
from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
Expand All @@ -39,34 +40,41 @@

from playwright.async_api import Route, Request

logger_provider = LoggerProvider(resource=Resource.create(
{
"service.name": "load-generator",
}
),)
# Configure tracer provider first (needed for trace context in logs)
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(insecure=True)))

# Configure logger provider with the same resource
logger_provider = LoggerProvider()
set_logger_provider(logger_provider)

exporter = OTLPLogExporter(insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
# Set up log exporter and processor
log_exporter = OTLPLogExporter(insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))

# Create logging handler that will include trace context
handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider)

# Attach OTLP handler to locust logger
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)
# Configure root logger
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)

exporter = OTLPMetricExporter(insecure=True)
set_meter_provider(MeterProvider([PeriodicExportingMetricReader(exporter)]))
# Configure metrics
metric_exporter = OTLPMetricExporter(insecure=True)
set_meter_provider(MeterProvider([PeriodicExportingMetricReader(metric_exporter)]))

tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
# Instrument logging to automatically inject trace context
LoggingInstrumentor().instrument(set_logging_format=True)

# Instrumenting manually to avoid error with locust gevent monkey
Jinja2Instrumentor().instrument()
RequestsInstrumentor().instrument()
SystemMetricsInstrumentor().instrument()
URLLib3Instrumentor().instrument()
logging.info("Instrumentation complete")

logging.info("Instrumentation complete - logs will now include trace context")

# Initialize Flagd provider
base_url = f"http://{os.environ.get('FLAGD_HOST', 'localhost')}:{os.environ.get('FLAGD_OFREP_PORT', 8016)}"
Expand Down Expand Up @@ -107,76 +115,110 @@ def get_flagd_value(FlagName):
class WebsiteUser(HttpUser):
wait_time = between(1, 10)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tracer = trace.get_tracer(__name__)

@task(1)
def index(self):
self.client.get("/")
with self.tracer.start_as_current_span("user_index"):
logging.info("User accessing index page")
self.client.get("/")

@task(10)
def browse_product(self):
self.client.get("/api/products/" + random.choice(products))
product = random.choice(products)
with self.tracer.start_as_current_span("user_browse_product", attributes={"product.id": product}):
logging.info(f"User browsing product: {product}")
self.client.get("/api/products/" + product)

@task(3)
def get_recommendations(self):
params = {
"productIds": [random.choice(products)],
}
self.client.get("/api/recommendations", params=params)
product = random.choice(products)
with self.tracer.start_as_current_span("user_get_recommendations", attributes={"product.id": product}):
logging.info(f"User getting recommendations for product: {product}")
params = {
"productIds": [product],
}
self.client.get("/api/recommendations", params=params)

@task(3)
def get_ads(self):
params = {
"contextKeys": [random.choice(categories)],
}
self.client.get("/api/data/", params=params)
category = random.choice(categories)
with self.tracer.start_as_current_span("user_get_ads", attributes={"category": str(category)}):
logging.info(f"User getting ads for category: {category}")
params = {
"contextKeys": [category],
}
self.client.get("/api/data/", params=params)

@task(3)
def view_cart(self):
self.client.get("/api/cart")
with self.tracer.start_as_current_span("user_view_cart"):
logging.info("User viewing cart")
self.client.get("/api/cart")

@task(2)
def add_to_cart(self, user=""):
if user == "":
user = str(uuid.uuid1())
product = random.choice(products)
self.client.get("/api/products/" + product)
cart_item = {
"item": {
"productId": product,
"quantity": random.choice([1, 2, 3, 4, 5, 10]),
},
"userId": user,
}
self.client.post("/api/cart", json=cart_item)
quantity = random.choice([1, 2, 3, 4, 5, 10])
with self.tracer.start_as_current_span("user_add_to_cart",
attributes={"user.id": user, "product.id": product, "quantity": quantity}):
logging.info(f"User {user} adding {quantity} of product {product} to cart")
self.client.get("/api/products/" + product)
cart_item = {
"item": {
"productId": product,
"quantity": quantity,
},
"userId": user,
}
self.client.post("/api/cart", json=cart_item)

@task(1)
def checkout(self):
# checkout call with an item added to cart
user = str(uuid.uuid1())
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
with self.tracer.start_as_current_span("user_checkout_single", attributes={"user.id": user}):
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
logging.info(f"Checkout completed for user {user}")

@task(1)
def checkout_multi(self):
# checkout call which adds 2-4 different items to cart before checkout
user = str(uuid.uuid1())
for i in range(random.choice([2, 3, 4])):
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
item_count = random.choice([2, 3, 4])
with self.tracer.start_as_current_span("user_checkout_multi",
attributes={"user.id": user, "item.count": item_count}):
for i in range(item_count):
self.add_to_cart(user=user)
checkout_person = random.choice(people)
checkout_person["userId"] = user
self.client.post("/api/checkout", json=checkout_person)
logging.info(f"Multi-item checkout completed for user {user}")

@task(5)
def flood_home(self):
for _ in range(0, get_flagd_value("loadGeneratorFloodHomepage")):
self.client.get("/")
flood_count = get_flagd_value("loadGeneratorFloodHomepage")
if flood_count > 0:
with self.tracer.start_as_current_span("user_flood_home", attributes={"flood.count": flood_count}):
logging.info(f"User flooding homepage {flood_count} times")
for _ in range(0, flood_count):
self.client.get("/")

def on_start(self):
ctx = baggage.set_baggage("session.id", str(uuid.uuid4()))
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx)
context.attach(ctx)
self.index()
with self.tracer.start_as_current_span("user_session_start"):
session_id = str(uuid.uuid4())
logging.info(f"Starting user session: {session_id}")
ctx = baggage.set_baggage("session.id", session_id)
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx)
context.attach(ctx)
self.index()


browser_traffic_enabled = os.environ.get("LOCUST_BROWSER_TRAFFIC_ENABLED", "").lower() in ("true", "yes", "on")
Expand All @@ -185,30 +227,38 @@ def on_start(self):
class WebsiteBrowserUser(PlaywrightUser):
headless = True # to use a headless browser, without a GUI

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tracer = trace.get_tracer(__name__)

@task
@pw
async def open_cart_page_and_change_currency(self, page: PageWithRetry):
try:
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/cart", wait_until="domcontentloaded")
await page.select_option('[name="currency_code"]', 'CHF')
await page.wait_for_timeout(2000) # giving the browser time to export the traces
except:
pass
with self.tracer.start_as_current_span("browser_change_currency"):
try:
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/cart", wait_until="domcontentloaded")
await page.select_option('[name="currency_code"]', 'CHF')
await page.wait_for_timeout(2000) # giving the browser time to export the traces
logging.info("Currency changed to CHF")
except Exception as e:
logging.error(f"Error in change currency task: {str(e)}")

@task
@pw
async def add_product_to_cart(self, page: PageWithRetry):
try:
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/", wait_until="domcontentloaded")
await page.click('p:has-text("Roof Binoculars")', wait_until="domcontentloaded")
await page.click('button:has-text("Add To Cart")', wait_until="domcontentloaded")
await page.wait_for_timeout(2000) # giving the browser time to export the traces
except:
pass
with self.tracer.start_as_current_span("browser_add_to_cart"):
try:
page.on("console", lambda msg: print(msg.text))
await page.route('**/*', add_baggage_header)
await page.goto("/", wait_until="domcontentloaded")
await page.click('p:has-text("Roof Binoculars")', wait_until="domcontentloaded")
await page.click('button:has-text("Add To Cart")', wait_until="domcontentloaded")
await page.wait_for_timeout(2000) # giving the browser time to export the traces
logging.info("Product added to cart successfully")
except Exception as e:
logging.error(f"Error in add to cart task: {str(e)}")


async def add_baggage_header(route: Route, request: Request):
Expand Down
1 change: 1 addition & 0 deletions src/load-generator/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ opentelemetry-instrumentation-system-metrics==0.55b1
opentelemetry-instrumentation-urllib3==0.55b1
opentelemetry-sdk==1.34.1
opentelemetry-semantic-conventions==0.55b1
opentelemetry-instrumentation-logging==0.55b1