diff --git a/benchmark/elastic/elastic_benchmark_config.py b/benchmark/elastic/elastic_benchmark_config.py old mode 100644 new mode 100755 index eaa41eb..d24a65b --- a/benchmark/elastic/elastic_benchmark_config.py +++ b/benchmark/elastic/elastic_benchmark_config.py @@ -1,19 +1,19 @@ DATA_DIR="../workload/trace/" -WORKLOAD_FILE_NAME="network_log_search_30.ndjson" +#WORKLOAD_FILE_NAME="network_log_search_needles_30.ndjson" +#WORKLOAD_FILE_NAME="network_log_search_needles_30_2.ndjson" +WORKLOAD_FILE_NAME="network_log_analytics_avg_30.ndjson" +#WORKLOAD_FILE_NAME="debug.ndjson" OUTPUT_DIR="elastic-output/" -OUTPUT_FILE_NAME="query_execution_times.csv" +#OUTPUT_FILE_NAME="elastic_network_log_search_needles_30_execution_times.csv" +#OUTPUT_FILE_NAME="elastic_network_log_search_needles_30_2_execution_times.csv" +OUTPUT_FILE_NAME="elastic_network_log_analytics_avg_30_execution_times.csv" +#OUTPUT_FILE_NAME="debug.csv" -QUERIES = ["search id.orig_h", - "search id.orig_h + sort ts", - "search id.orig_h + sort ts + slice 5", - "search id.orig_h + count by id.resp_h", - "search id.orig_h + sum orig_bytes", - "search id.orig_h + count by schema"] +QUERIES = ["search id.orig_h","search id.orig_h + sort ts + slice 1000", "analytics avg field"] -AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h", - "search id.orig_h + count by schema": "_path" } - +AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h", + "search id.orig_h + count by schema": "_path" } diff --git a/benchmark/elastic/issue_elastic_queries.py b/benchmark/elastic/issue_elastic_queries.py index 7ac4c99..e9bec99 100755 --- a/benchmark/elastic/issue_elastic_queries.py +++ b/benchmark/elastic/issue_elastic_queries.py @@ -3,6 +3,7 @@ import numpy as np import os import pandas as pd +import time from elastic_benchmark_config import * @@ -13,67 +14,80 @@ queries = QUERIES aggregation_fields = AGGREGATION_FIELDS -def runQuery(query): +def runQuery(query, queryname): cmd = query + " > " + output_directory + "query-output.json" os.system(cmd) + + last_ports = [] + with open(output_directory + "query-output.json", 'r') as j: contents = json.loads(j.read()) executiontime = contents["took"] hits = contents["hits"]["total"]["value"] + if queryname == "analytics avg field": + avg = contents["aggregations"]["avg_field"]["value"] + else: + avg = 0 + if queryname == "search id.orig_h + sort ts + slice 1000": + for content in contents["hits"]["hits"]: + last_ports.append(content["_source"]["id.orig_p"]) + last_port = last_ports.pop() + else: + last_port = 0 + ret = {"executiontime": executiontime * 0.001, "hits": hits, "avg": avg, "last_port": last_port } os.system("rm " + output_directory + "query-output.json") - return {"executiontime": executiontime * 0.001, "hits": hits} + return ret -def getQuery(queryname, arguments): - # TODO: edit queries to return all results, not just top 10-1000 +def getQuery(queryname, arguments, sortval): if queryname == "search id.orig_h": - query = "curl -X GET 'http://localhost:9200/test/_search?q=id.orig_h:"+arguments[0]+"&format=json&pretty'" - elif queryname == "search id.orig_h + sort ts": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - elif queryname == "search id.orig_h + sort ts + slice 5": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 5,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - elif queryname == "search id.orig_h + count by id.resp_h": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" - elif queryname == "search id.orig_h + sum orig_bytes": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" - elif queryname == "search id.orig_h + count by schema": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" + query = "curl -X POST \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"query\": {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\"}}}'" + elif queryname == "search id.orig_h + sort ts + slice 1000": + query = "curl -X GET \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "analytics avg field": + query = "curl -X GET \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"aggs\": {\"avg_field\": {\"avg\": { \"field\": \""+arguments[0]+"\"}}}}'" else: query= "" return query def issueQueries(): - real_list = [] + took_list = [] query_name_list = [] argument_list = [] - validation_list =[] - + avg_list =[] + hits_list =[] + real_list=[] + port_list =[] + restartElastic() with open(input_directory + workload_file ) as f: data = ndjson.load(f) for d in data: + start_time = time.time() if (d['query'] in queries): queryname = d['query'] - query = getQuery(queryname, d['arguments']) + query = getQuery(queryname, d['arguments'], 0) if query: - if queryname in aggregation_fields: - prepForAggregation(aggregation_fields[queryname]) - queryoutput = runQuery(query) - real_list.append(queryoutput["executiontime"]) + queryoutput = runQuery(query, queryname) + + took_list.append(queryoutput['executiontime']) query_name_list.append(d['query']) argument_list.append(d['arguments']) - validation_list.append(queryoutput["hits"]) + avg_list.append(queryoutput["avg"]) + hits_list.append(queryoutput["hits"]) + port_list.append(queryoutput["last_port"]) - if queryname in aggregation_fields: - cleanFromAggregation(aggregation_fields[queryname]) else: print('Query Not Supported: ' + str(d['query'])) else: print('Query Not Supported: ' + str(d['query'])) - - num_queries = len(real_list) + + end_time = time.time() + real_list.append(end_time-start_time) + + num_queries = len(took_list) output_df = pd.DataFrame({'index': range(num_queries)}) output_df = output_df.set_index('index') @@ -86,7 +100,14 @@ def issueQueries(): output_df.insert(6, 'user', [np.nan] * num_queries) output_df.insert(7, 'sys',[np.nan] * num_queries) output_df.insert(8, 'argument_0', argument_list) - output_df.insert(9, 'validation', validation_list) + if any(avg_list): + output_df.insert(9, 'validation', avg_list) + elif any(port_list): + output_df.insert(9, 'validation', port_list) + else: + output_df.insert(9, 'validation', hits_list) + output_df.insert(10, 'instance', ["m5d.2xlarge"] * num_queries) + output_df.insert(11,'took', took_list) output_df.to_csv(output_directory + output_file, na_rep='NaN') return output_df @@ -94,14 +115,17 @@ def issueQueries(): def restartElastic(): os.system("sudo systemctl stop elasticsearch.service") + print("Elastic Stopped") os.system("sudo systemctl start elasticsearch.service") + print("Elastic Started") os.system("sleep 30") + print("Sleep Complete") -# Aggregation prep/clean due to: -# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. -# Please use a keyword field instead. -# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. +# Aggregation prep/clean due to: +# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. +# Please use a keyword field instead. +# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. # Note that this can use significant memory." def prepForAggregation(fieldname):