TitanFuzz
终于写完了
ev_generation.py
主程序
参数
- –model_name:使用的模型,两种:
- facebook/incoder-1B
- facebook/incoder-6B
- –library:扫哪个库:
- tf
- torch
- –api:要扫描的api:
all:全部
指定api
- –apilist:要扫描的api的文件路径,如:
- tf:data/tf_apis.txt
- torch:data/torch_apis.txt
- –startid:从几号种子开始,默认0
- –endid:到几号种子结束,默认-1
- –folder:工作目录,默认Results/${TEST_LIB}
- –seedfolder:种子库路径,默认:
- tf:../codex_seed_programs/codex_tf_seeds/fix
- torch:../codex_seed_programs/codex_torch_seeds/fix
- –use_sample_apis:是否使用样例api
- 是
- tf:data/tf_apis_100sample.txt
- torch:data/torch_apis_100sample.txt
- 否
- tf:data/tf_apis.txt
- torch:data/torch_apis.txt
- 是
- –random_seed:随机数的种子
- –max_valid:最大有效种子数
- –batch_size:一次前向传播中同时处理的样本数量
- –timeout:时间限制
- –seed_pool_size:种子池大小
- –only_valid:只使用可以成功执行的seed
- –relaxargmut:随意替换argument
- –seed_selection_algo:选取种子的标准:
- random:随机选取种子
- fitness:根据fitness score选择种子,还分三种:
- fitness:api call数+深度-重复数
- fitnessue:api call数-重复数
- fitnessud:api call数+深度
- fitnessde:深度-重复数
- coverage:根据覆盖率选择种子
- –mutator_selection_algo:多臂赌博机算法权衡策略
heuristic:启发式
epsgreedy:epsilon-greedy算法
ucb:Upper Confidence Bound算法
random:随机选择
ts:Thompson Sampling算法
- –use_single_mutator:只使用一种变异方法
- –replace_type:替代类型
- argument:替换参数
- keyword:替换关键字
- method:替换方法
- prefix:前缀替换几行
- prefix-argument:两者组合
- suffix:后缀替换几行
- suffix-argument:两者组合
- –mutator_set:变异策略,包括不同的replace_type策略组合
- all:全部
- noprefix:无prefix
- nosuffix:无suffix
- noargument:无argument
- nomethod:无method
- –validate_mode:种子验证方式
- process:使用process执行验证
- multiprocess:使用执行验证
- exec:使用exec执行验证
- –close_fd_mask:用flags的形式设置是否关闭stdout和stderr
main
参数解析,主要说明一下–api all的处理:
–apilist不为空,则使用apilist,如设置了startid和endid则只取第startid到第endid个api
1
2
3
4
5
6if args.apilist is not None:
with open(args.apilist, "r") as f:
all_apis = f.read().splitlines()
if args.endid != -1:
all_apis = all_apis[: args.endid]
all_apis = all_apis[args.startid :]–apilist为空:
- 设置–use_sample_apis:
- torch:data/torch_apis_100sample.txt
- tf:data/tf_apis_100sample.txt
- 未设置–use_sample_apis:
- torch:data/tf_apis.txt
- tf:data/torch_apis.txt
- 设置–use_sample_apis:
然后对于获取的每个api重新设置–api参数再次执行命令,非常朴实无华
1
2
3
4
5
6for api_idx, api in enumerate(all_apis):
print("[{} / {}] {}".format(api_idx, num_apis, api))
# 检查种子文件是否存在和工作目录是否不为空
run_args_api = run_args.copy()
run_args_api[ind] = api
run_cmd(run_args_api, timeout=args.timeout + 50, verbose=True)run_cmd见util/util.py
输出示例:
1
2
3
4
5
6
7Current directory: /mnt/workspace/fuzz/TitanFuzz
Results will be dumped to: /mnt/workspace/fuzz/TitanFuzz/Results
[0 / 2] tf.nn.conv2d
output.returncode: 0
stdout> Current directory: /mnt/workspace/fuzz/TitanFuzz
Results will be dumped to: /mnt/workspace/fuzz/TitanFuzz/Results
> api: tf.nn.conv2d初始化test_executor,这个executor用于
1
mp_executor.init_test_executor(args, cov=(args.seed_selection_algo == "coverage"))
初始化与是否需要记录覆盖率有关,init_test_executor见mycoverage/mp_executor.py
初始化SpanLM
1
model = SpanLM(args.model_name, batch_size=args.batch_size)
SpanLM见model.py
设置随机数种子
1
set_seed(args.random_seed)
1
2
3def set_seed(seed: int):
random.seed(seed)
np.random.seed(seed)进入generate
1
generate(args, model)
kill executor
1
mp_executor.kill_executors()
没什么好见的doge
generate
工作目录Results/${TEST_LIB}下创建过程目录:
- seed:种子
- valid:有效
- flaky:执行crash但验证没crash
- hangs:超时
- crash:crash
- exception:exception
- notarget:有些推理的output可能没有目标api,此时的output可以作为一个有用的seed所以放在一个单独的文件夹
并写了一个arg.txt记录参数
获取初始种子:
1
2
3apis = get_initial_programs(
args.seedfolder, infill_ph, args.library, "argument", target_api=args.api
)get_initial_programs见process_file.py
对于每个seed,执行:
调用validate_status验证种子是否有效
1
2
3
4
5
6status, msg = validate_status(
seed["original"],
args.library,
validate_mode=args.validate_mode,
test_executor=mp_executor.test_executor,
)validate_mode就是字面意思,除了multiprocess使用test_executor执行
最后得到一个seeds_for_generation
进入generate_loop:
需要记录:
1
2
3
4
5
6
7
8
9
10
11
12
13
14gen_ret[api] = {}
gen_ret[api]["seeds"] = seeds_for_generation
gen_ret[api]["initials"] = seeds_for_generation
(
gen_ret[api]["outputs"],
gen_ret[api]["p"],
gen_ret[api]["crashes"],
gen_ret[api]["g_time"],
gen_ret[api]["v_time"],
gen_ret[api]["tot_time"],
gen_ret[api]["tot_prog"],
) = generate_loop(
args, model, seeds_for_generation, api, logger, args.max_valid
)gen_ret会被记录到工作目录下(Results/${TEST_LIB}/outputs.json)
generate_loop见ev_generation.py
把crash的seed记录至工作目录下(Results/${TEST_LIB})
fuzz loop主要流程
- 种子选取:
- 随机选取
- 根据fitness score选取
- 变异策略选取:多臂赌博机算法
- 变异,生成一批output
- 种子更新:
- 根据覆盖率选择种子时如果种子没有提高覆盖率则将种子删除
- 将output加入种子库:
- 如果根据fitness score选取种子还需要更新种子库的小根堆
generate_loop
根据self.seed_selection_algo初始化GA_class:
- fitness:GAR_depth
- random:GA_Random
- coverage:GA_Coverage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16ga = GA_class(
original_codes,
num_selection,
args.batch_size,
args.folder,
api,
model.infill_ph,
args.library,
args.relaxargmut,
args.seed_selection_algo,
args.mutator_selection_algo,
args.use_single_mutator,
args.replace_type,
args.seed_pool_size,
args.mutator_set,
)GAR_depth、GA_Random、GA_Coverage见util/Seed_pool.py
进入fuzz大循环,条件
1
2
3while (max_valid < 0 or num_valid < max_valid) and sum(
total_run_time
) < args.timeout:选择一批种子(其实是一个)
1
2
3selections = ga.selection()
# ……
for seed, infill_code, replace_type in selections:selection见util/Seed_pool.py
推理
1
2
3well, early_stop, outputs = model.model_predict_multi(
infill_code, do_sample=True, num_samples=args.batch_size
)model_predict_multi见model.py
对于每个预测的output:
调用clean_code清理代码
clean_code见process_file.py
调用dead_code_elim清除死代码
dead_code_elim见util/clean_code.py
调用SnippetInfill计算num_replaced
1
2
3
4
5
6
7num_replaced, _, _ = SnippetInfill(
mask_identifier=model.infill_ph,
api_call=api.split(".")[-1],
prefix=".".join(api.split(".")[1:-1]),
library=args.library,
replace_type="argument",
).add_infill(output)
SnippetInfill见util/instrumentor.py
调用validate_status验证output有效性
validate_status见ev_generation.py/generate有讲解
如果num_replaced为0即所有目标api被遮盖,则将output放入notarget文件夹进入下一轮fuzz循环
根据不同的output执行结果放入对应文件夹
SUCCESS:valid
TIMEOUT:hangs
CRASH:需要用process模式再次验证
- CRASH:crash,并放入工作目录一份
- 没crash:flasky,之前的crash可能是因为某些玄学因素
EXCEPTION:execption
如果种子选择模式为coverage且output有效还要调用coverate_run_status_mp收集覆盖率
1
2
3
4
5status_, new_coverage = mp_executor.coverate_run_status_mp(
output, args.library, cov_executor=mp_executor.cov_executor
)
print("> coverage run: ", status_, new_coverage)
add_flags.append(new_coverage)
ga更新
1
2
3
4if args.seed_selection_algo == "coverage":
ga.update(seed, generations, replace_type, r, filenames, add_flags)
else:
ga.update(seed, generations, replace_type, r, filenames)update见util/Seed_pool.py
log
- Round:变异轮数
- New Valid:本轮生成出的有效的output数
- generation:本轮变异生成用时
- validation:本轮验证output有效性总用时
1
2--- Round : 21 ---
--- New Valid : 0 using 0.9325060844421387s generation, 0s validation ---
调用get_highest_order_output获取最大变异层数和相应种子的info,并输出种子内容
1
n, highest_order = ga.get_highest_order_output()
1
2
3
4
5
6
7
8Highest Order: 1
-----
input_data = tf.Variable(np.random.rand(10, 9, 9, 3), dtype=np.float32)
filter_data = tf.Variable(np.random.rand(3, 3, 3, 16), dtype=np.float32)
y = tf.nn.conv2d(input_data, filter_data, strides=[1, 1, 1, 1], padding='SAME')
-----get_highest_order_output见util/Seed_pool.py
log
生成有效output总数,生成总用时,验证总用时
1
33 valid outputs using 44.97628092765808s generation, 16.274909734725952s validation
生成output总数,各类output数
1
121 generated: 46 exceptions 39 duplicated 0 crashes 0 timeouts 3 notargetd
model.py / class SpanLM
__init__
load model
获取tokenizer
设置infill_ph
1
self.infill_ph = "<|mask:{}|>"
build_input_multi
1 |
|
在已经mask过的代码后面添加两个mask:
- 第一个是mask的数量
- 第二个是需要推理的mask的idx
比如:
例一:
1
2
3
4
5
6
7infill = '''
conv = torch.<|mask:0|>(1, 1, 3)
'''
n_infill = '''
conv = torch.<|mask:0|>(1, 1, 3)<|mask:1|><|mask:0|>
'''例二:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28infill = '''
import torch
import numpy as np
<|mask:0|>
conv2d = torch.nn.Conv2d(<|mask:1|>)
output = conv2d(input_data)
'''
n_infill = '''
import torch
import numpy as np
<|mask:0|>
conv2d = torch.nn.Conv2d(<|mask:1|>)
output = conv2d(input_data)<|mask:2|><|mask:0|>
'''
tmp_prompt = [
'''
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)<|mask:2|><|mask:0|>
import torch.nn as nn\n
import torch.nn.functional as F<|endofmask|><|mask:1|>
''',
# ……
]
model_predict_multi
举两个例子说明:
如果只有一个mask需要推理:
1
2
3infill = '''
conv = torch.<|mask:0|>(1, 1, 3)
'''先调用build_input_multi添加额外的mask
1
n_infill = 'conv = torch.<|mask:0|>(1, 1, 3)<|mask:1|><|mask:0|>'
再repeat成batch_size长度的列表
generate,结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44o = [
'''<|endoftext|>
conv = torch.<|mask:0|>(1, 1, 3)
<|mask:1|>
<|mask:0|>
……
''',
'''<|endoftext|>
conv = torch.<|mask:0|>(1, 1, 3)
<|mask:1|>
<|mask:0|>
nn.Conv2d(n_channels, n_filters[0], kernel_size=(7, 7), stride=2, padding=
<|endofmask|>
……
''',
'''<|endoftext|>
conv = torch.<|mask:0|>(1, 1, 3)
<|mask:1|>
<|mask:0|>
nn.functional.max_pool1d(x,
<|endofmask|>
……
''',
'''<|endoftext|>
conv = torch.<|mask:0|>(1, 1, 3)
<|mask:1|>
<|mask:0|>
nn.functional.pad(conv,
<|endofmask|>
……
''',
'''<|endoftext|>
conv = torch.<|mask:0|>(1, 1, 3)
<|mask:1|>
<|mask:0|>
randn
<|endofmask|>
……
'''
]用第二个<|mask:0|>和<|endofmask|>之间的代码代替<|mask:0|>,如果没有<|endofmask|>则丢弃
1
2
3
4
5
6outputs = [
'conv = torch.nn.Conv2d(n_channels, n_filters[0], kernel_size=(7, 7), stride=2, padding=(1, 1, 3)',
'conv = torch.nn.functional.max_pool1d(x, (1, 1, 3)',
'conv = torch.nn.functional.pad(conv, (1, 1, 3)',
'conv = torch.randn(1, 1, 3)'
]
如果有多个mask需要推理:
1
2
3
4
5
6
7infill = '''
import torch
import numpy as np
<|mask:0|>
conv2d = torch.nn.Conv2d(<|mask:1|>)
output = conv2d(input_data)
'''每次循环前都要调用build_input_multi添加额外的mask
1
2
3
4
5
6
7n_infill = '''
import torch
import numpy as np
<|mask:0|>
conv2d = torch.nn.Conv2d(<|mask:1|>)
output = conv2d(input_data)<|mask:2|><|mask:0|>
'''再repeat成batch_size长度的列表
generate,结果1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69o = [
'''<|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import torch.nn as nn\n
import torch.nn.functional as F
<|endofmask|>
……
''',
"""<|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import math
<|endofmask|>
……
""",
'''
<|endoftext|>import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
input_data = torch.randn(1,3,32, 32)
<|endofmask|>
……
''',
'''<|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
from torch.nn import functional as F\n
input_data = torch.autograd.Variable(torch.randn(2,5,7,7).cuda())
<|endofmask|>
……
''',
'''<|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import matplotlib.pylab as plt\n
from torchvision.models import resnet
<|endofmask|>
……
'''
]clean code,添加mask后的tmp_prompt,需要推理第二个mask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69tmp_prompt = [
'''
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import torch.nn as nn\n
import torch.nn.functional as F
<|endofmask|>
<|mask:1|>
''',
'''
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import math
<|endofmask|>
<|mask:1|>
''',
'''
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
input_data = torch.randn(1,3,32, 32)
<|endofmask|>
<|mask:1|>
''',
'''
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
from torch.nn import functional as F\n
input_data = torch.autograd.Variable(torch.randn(2,5,7,7).cuda())
<|endofmask|>
<|mask:1|>
''',
'''
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import matplotlib.pylab as plt\n
from torchvision.models import resnet
<|endofmask|>
<|mask:1|>
'''
]generate,结果2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89o = [
'''<s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import torch.nn as nn\n
import torch.nn.functional as F
<|endofmask|>
<|mask:1|>
3, 64, kernel_size=4, padding=1)\n
\n
print(type(conv2d))\n
print(conv2d
<|endofmask|>
……
''',
'''
<s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><s><|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import math
<|endofmask|>
<|mask:1|>
3,16, 3, padding=1)\n
print(conv2d)\n
print(output_np
<|endofmask|>
……
''',
'''<s><s><s><s><s><s><s><s><s><s><s><s><|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
input_data = torch.randn(1,3,32, 32)
<|endofmask|>
<|mask:1|>
2,4,3,padding = 1, bias = False
<|endofmask|>
……
''',
'''<|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
from torch.nn import functional as F\n
input_data = torch.autograd.Variable(torch.randn(2,5,7,7).cuda())
<|endofmask|>
<|mask:1|>
……
''',
"""
<s><s><s><s><s><s><s><s><s><s><|endoftext|>
import torch\n
import numpy as np\n
<|mask:0|>\n
conv2d = torch.nn.Conv2d(<|mask:1|>)\n
output = conv2d(input_data)
<|mask:2|>
<|mask:0|>
import matplotlib.pylab as plt\n
from torchvision.models import resnet
<|endofmask|>
<|mask:1|>
in_channels = 3, out_channels = 3, kernel_size = 3
<|endofmask|>
……
"""
]用<|mask:0|>到<|endofmask|>之间的部分替换<|mask:0|>,用<|mask:1|>到<|endofmask|>之间的部分替换<|mask:1|>,缺少<|endofmask|>则丢弃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40outputs = [
'''
import torch\n
import numpy as np\n
import torch.nn as nn\n
import torch.nn.functional as F\n
conv2d = torch.nn.Conv2d(3, 64, kernel_size=4, padding=1)\n
\n
print(type(conv2d))\n
print(conv2d)\n
output = conv2d(input_data)
''',
'''
import torch\n
import numpy as np\n
import math\n
conv2d = torch.nn.Conv2d(3,16, 3, padding=1)\n
print(conv2d)\n
print(output_np)\n
output = conv2d(input_data)
''',
'''
import torch\n
import numpy as np\n
input_data = torch.randn(1,3,32, 32)\n
conv2d = torch.nn.Conv2d(2,4,3,padding = 1, bias = False)\n
output = conv2d(input_data)
''',
'''
import torch\n
import numpy as np\n
import matplotlib.pylab as plt\n
from torchvision.models import resnet\n
conv2d = torch.nn.Conv2d(in_channels = 3, out_channels = 3, kernel_size = 3)\n
output = conv2d(input_data)
'''
]
process_file.py
get_initial_programs
种子库目录../codex_seed_programs/codex_${TEST_LIB}_seeds/fix
因为此时target_api不可能为all,所以apis只有一个
1
apis = [os.path.join(directory, target_api)]
读取所有目标api种子文件,比如../codex_seed_programs/codex_tf_seeds/fix/tf.ones/*,处理每个种子
清理种子文件
1
2
3original = clean_code(
f.read(), prints_and_imports=True, comment=True, cuda=True
)clean_code见process_file.py
初始化SnippetInfill
1
2
3
4
5
6
7infill = SnippetInfill(
mask_identifier=mask_identifier,
api_call=api_call,
prefix=".".join(api.split("/")[-1].split(".")[1:-1]),
library=library,
replace_type=replace_type,
)SnippetInfill见util/instrumentor.py
输出:
- Syntax error:语法错误seed数量
- Multi-API calls:0,乐,这玩意是干嘛的
- No Api calls:没有目标api的seed数量
- Successful:替换成功的seed数量
返回:ret
1
2
3ret[api.split("/")[-1]].append(
{"original": original_code, "infill": infill_code}
)
btw,SnippetInfill替换完的infill_code没用:)
clean_code
主要包括:
清除函数定义
清除tags,由于seed是codex生成的,可能会有如下形式的tag
1
tag_pattern = re.compile(r"<.+>") # such as <cell>, <test>, </cell>
去掉一些api
1
2
3
4
5
6
7
8bad_codes = [
"tf.test.main()",
"tf.compat.v1.test.main()",
"disable_eager_execution()",
"disable_v2_behavior",
"InteractiveSession",
"exit()",
]去除种子设置
1
return re.sub("torch\.manual_seed\(\S+\)", "", code)
去除空行
去除主函数
1
2code = original.replace('if __name__ == "__main__":', "if True:")
code = code.replace("if __name__ == '__main__':", "if True:")去除print、import和from
去除cuda设置
去除session
1
2
3
4
5
6
7
8session_pattern1 = re.compile(r"with tf.compat.v1.Session()")
session_pattern2 = re.compile(r"with tf.Session()")
def remove_session(original):
code = re.split(session_pattern1, original)[0]
code = re.split(session_pattern2, code)[0]
return code
以上清理是使用模式匹配完成的
以下清理是使用ast完成的:
清理comments
先去除#开头的行
PassRemoveDocstring类去除docstring
1
2
3
4
5
6
7try:
root = ast.parse(code)
PassRemoveDocstring().remove_docstring(root)
modified = ast.fix_missing_locations(root)
code_cleaned = astunparse.unparse(root)
except:
return codePassRemoveDocstring见util/astpasses.py
语法修复,就是通过不断去除最后一行再parse、unparse、parse直到没有语法错误
去除name的keyword,形如
1
test(name='test')
通过SearchAllCall获取所有call结点
去除name的keyword
1
2
3
4if "keywords" in dir(
node
): # all calls should have keyword arguments already
node.keywords = [a for a in node.keywords if a.arg != "name"]
mycoverage
mp_executor.py
全局两个executor,一个模式flag
1 |
|
init_test_executor
传递参数close_fd_mask和debug
torch:
需要收集覆盖率
1
cov_executor = PyTorchCoverageExecutor(**kwargs)
初始化test_executor
1
test_executor = PyTorchExecutor(**kwargs)
tf:
设置环境变量TF_CPP_MIN_LOG_LEVEL=3
设置tf.logger
1
tf.get_logger().setLevel("ERROR")
tensorflow设置内存动态增长
传递参数cpu=RUN_CPU_MODE
需要收集覆盖率
1
cov_executor = TensorFlowCoverageExecutor(**kwargs)
初始化test_executor
1
test_executor = TensorFlowExecutor(**kwargs)
exec_func
单纯读取文件+执行
exec_func_tf_cpu
单纯读取文件+设置cpu+执行
cov_worker_tf
关闭输出的方式是直接导到/dev/null
1
2
3
4
5f = open(os.devnull, "w")
if close_fd_mask & 1:
sys.stdout = f
if close_fd_mask & 2:
sys.stderr = f设置回调函数,用于获取coverage
1
sys.settrace(tracer.trace_tf)
trace_tf见mycoverage/tracer.py
接受server传递的测试文件名
执行测试文件
回传错误信息或覆盖率
cov_worker_torch
类似tf
worker_tf
少一个覆盖率,其他一样
worker_torch
少一个覆盖率,其他一样
class Executor
所有的executor都相当于一个server和一个client,通过一对pipe通信
参数:
- worker:client,使用child_conn
- single_test_timeout:单次测试timeout,默认10
- close_fd_mask:需要关闭的pipe(stdout和stderr)
- exec_func:执行测试文件使用的执行命令,默认为exec_func
- debug:是否开启debug模式
__init__:
开启client
1
2
3
4
5self._p = ctx.Process(
target=self.worker,
args=(self._exec_func, self.child_conn, self._close_fd_mask),
)
self._p.start()设置test程序的文件夹路径
1
2
3self._test_dir = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "test_programs"
)
run_test:
- server向client发送测试文件名
- 轮询client,直到timeout重启client
- 等待client输出
class CoverageExecutor
父类Executor
__init__:
1
2
3
4def __init__(self, worker, single_test_timeout=10, **kwargs):
super().__init__(worker, single_test_timeout, **kwargs)
self.prev_coverage = 0
print("Init cov executor")run_test:
server向client发送测试文件名
轮询client,直到timeout重启client
等待client输出,报告并更新coverage
1
Cov: 502 -> 813
class TensorFlowCoverageExecutor
父类CoverageExecutor
__init__:
worker:cov_worker_tf
exec_func:
- cpu模式:exec_func_tf_cpu
- 非cpu模式:exec_func
设置trace的library
1
tracer.trace_library = "tf"
client执行memory_growth,并测试连通性
1
self.run_test(os.path.join(self._test_dir, "set_memory_growth.py"))
设置check程序
1
self.check_filename = os.path.join(self._test_dir, "check_tf_state.py")
class TensorFlowExecutor
父类Executor
__init__:
worker:worker_tf
exec_func:
- cpu模式:exec_func_tf_cpu
- 非cpu模式:exec_func
client执行memory_growth,并测试连通性
1
self.run_test(os.path.join(self._test_dir, "set_memory_growth.py"))
设置check程序
1
self.check_filename = os.path.join(self._test_dir, "check_tf_state.py")
class PyTorchExecutor
父类Executor
__init__:
worker:worker_torch
设置check程序
1
self.check_filename = os.path.join(self._test_dir, "check_torch_state.py")
class PyTorchCoverageExecutor
父类CoverageExecutor
__init__:
worker:cov_worker_torch
设置trace的library
1
tracer.trace_library = "torch"
设置check程序
1
self.check_filename = os.path.join(self._test_dir, "check_torch_state.py")
coverage.py
全局
- trace_library:要跟踪的库
- prev_line:前一行
- prev_filename:前一个文件
- data:coverage
设置跟踪函数,跟踪函数三个参数,frame,event,arg
1 |
|
- event取值
- call:函数被调用时触发
- return:函数返回时触发
- exception:函数抛出异常时触发
- line:语句执行
- frame
- f_code:当前帧正在执行的代码
- f_lineno:当前帧正在执行的代码行号
记录文件内的跳转和文件间的跳转
1 |
|
util
Seed_pool.py
class GA
__init__:
只需要注意self.num_generated初始化为batch_size
load api symbols
1
self.api_call_list, self.full_api_list = load_api_symbols(library)
比如aaa.bbb.ccc(),self.api_call_list包括ccc,self.full_api_list包括aaa.bbb.ccc
初始化SnippetInfillArbitratyAPI
1
2
3self.recursive_infill = SnippetInfillArbitratyAPI(
mask_identifier, self.api_call_list, self.full_api_list, self.library
)SnippetInfillArbitratyAPI见util/instrumentor.py
调用_init_seed初始化种子,由子类分别实现
调用_init_mutator初始化变异器
_init_mutator:
根据self.mutator_set设置self.replace_type
根据self.mutator_selection_algo初始化变异器相关:
heuristic:self.replace_type_p初始化
1
2
3
4
5
6
7
8
9self.replace_type_p = {
"argument": self.num_generated * 3,
"keyword": self.num_generated * 3,
"method": self.num_generated * 3,
"prefix": self.num_generated * 3,
"prefix-argument": self.num_generated * 3,
"suffix": self.num_generated * 3,
"suffix-argument": self.num_generated * 3,
}使用多臂赌博机算法:调用_init_multi_arm
_init_multi_arm:epsilon默认0.1,replace_type_p两个维度分别为成功次数和失败次数
1
2
3
4
5
6self.replace_type_p = {}
for t in self.replace_type:
# Initialize with 1 to avoid starving
# Change initial state to 0.5 following the Thompson Sampling algorithm
self.replace_type_p[t] = [1, 2] # success / total
self.epsilon = 0.1selection:
num_selection默认是1
调用_select_seed选一个种子,由子类实现,选完会把种子从种子库中移除(注:GAR_depth不会)
调用_select_mutator选一个replace_type
如果replace_type为argument、keyword或method,调用add_infill进行mask
1
2
3
4
5
6
7
8
9(
num_replaced,
infill_code,
original_code,
) = self.recursive_infill.add_infill(
o_ast,
add_keywords=(replace_type == "keyword"),
replace_method=(replace_type == "method"),
)如果没有可以mask的则使用SnippetInfill把argument全mask了
1
2
3
4
5
6
7
8infill = SnippetInfill(
mask_identifier=self.mask_identifier,
api_call=self.api_call.split(".")[-1],
prefix=".".join(self.api_call.split(".")[1:-1]),
library=self.library,
replace_type="argument",
)
num_replaced, infill_code, _ = infill.add_infill(code)如果是其他的replace_type比如prefix和suffix,则使用SnippetInfill进行mask
1
2
3
4
5
6
7
8infill = SnippetInfill(
mask_identifier=self.mask_identifier,
api_call=self.api_call.split(".")[-1],
prefix=".".join(self.api_call.split(".")[1:-1]),
library=self.library,
replace_type=replace_type,
)
num_replaced, infill_code, _ = infill.add_infill(code)
_select_mutator:
heuristic:分数概率化随机选择
1
2
3
4
5
6
7
8replace_type = np.random.choice(
self.replace_type,
1,
p=[
self.replace_type_p[x] / sum(list(self.replace_type_p.values()))
for x in self.replace_type
],
)[0]random:就随机选择
1
replace_type = np.random.choice(self.replace_type)
epsgreedy:epsilon-greedy
- 1-epsilon:选择当前累积回报最高的臂
- epsilon:选择未知臂
这时self.replace_type_p是(success, total),以success/total计算回报
ucb:Upper Confidence Bound
计算每个臂的ucb值,选择ucb值最高的臂
- t:回合数
- ni:第i臂被选择的次数,估计奖励是第i臂的平均奖励估计
- 估计奖励:这里使用成功率success/total
ts:Thompson Sampling
贝叶斯定理
- P(A):A的先验概率
- P(A|B):已知B发生A发生的概率,A的后验概率
- P(B):B的先验概率
- P(B|A):B的后验概率
后验概率 = (相似度 * 先验概率)/ 标准化常量
or
后验概率 = 标准相似度 * 先验概率
这里A是成功次数,B是失败次数
先验分布使用beta分布,这样后验分布也是beta分布
1
2
3
4
5
6
7
8scores = []
for a in self.replace_type:
alpha, n_t = self.replace_type_p[a]
beta = n_t - alpha
score_a = np.random.beta(alpha, beta)
scores.append(score_a)
max_index = np.argmax(scores)
return self.replace_type[max_index]
_update_mutator:
heuristic:更新被选择的方法的score,看注释吧
1
2
3
4
5
6
7
8# update the global counter
# roughly that score increases when at least 1/4 of generation is valid and unique
self.replace_type_p[replace_type] += len(generations) - 1 / 3 * (
self.num_generated - len(generations)
)
self.replace_type_p[replace_type] = max(
1, self.replace_type_p[replace_type]
)random:无需更新
多臂赌博机:success和total分别+1
update:
- 调用_update_seed更新种子,由子类实现
- 调用_update_mutator更新变异策略相关分数
- 对于每个种子调用_add_new_seed添加种子进种子库self.seeds,由子类实现
get_highest_order_output:
- highest_order为最大变异层数,n为对应种子的info
- 并输出max_depth、重复api call次数和api call中带keyword的个数
class GA_Random
_init_seed:单纯append
1
2
3
4
5
6
7
8
9
10self.seeds = []
self.info_code = {}
for idx, seed in enumerate(initial_seeds):
self.seeds.append(seed)
self.info_code[seed] = {
"mutation_layer": 0,
"used_as_seed": 0,
"parent": None,
"filename": "{}_{}{}.py".format(self.api_call, "seed", idx + 1),
}_select_seed:
就是随机选取
1
2
3
4def _select_seed(self):
code = np.random.choice(self.seeds)
self.seeds.remove(code)
return code_update_seed:
更新的info,再把种子放回去,emmmmm
1
2
3def _update_seed(self, code, value):
self.info_code[code]["used_as_seed"] += self.num_generated
self.seeds.append(code)_add_new_seed:
将生成的output(code)放进种子库self.seeds
1
2
3
4
5
6
7
8
9
10
11if code not in self.info_code:
self.num_valid += 1
self.info_code[code] = {
"mutation_layer": self.info_code[seed]["mutation_layer"] + 1,
"used_as_seed": 0,
"parent": seed,
"replace_type": replace_type,
"round": rd,
"filename": filename,
}
self.seeds.append(code)
class GA_Coverage
- _init_seed:同GA_Random
- _select_seed:同GA_Coverage
- update:比GA多了个add_flag,只有覆盖率更新才添加种子
- _update_seed:同GA_Coverage
- _add_new_seed:同GA_Coverage
class GAR_depth
_compute_fitness_score:
获取max_depth
1
max_depth = DepthFinder(self.library).max_depth(code)
DepthFinder见util/instrumentor.py
获取unique_calls和ex_repeats
1
unique_calls, ex_repeats, repeats = UniqueFinder(self.library).count(code)
UniqueFinder见util/instrumentor.py
不同的fitness计算方式不同:
- fitness:unique_calls + max_depth - ex_repeats
- fitnessue:unique_calls - ex_repeats
- fitnessud:unique_calls + max_depth
- fitnessde:max_depth - ex_repeats
_init_seed:
对于每个seed:
- 调用_compute_fitness_score打分
- 取长度的负数
以这两个数为权重,将所有的seed存入一个小根堆
1
2
3
4heapq.heappush(
self.seeds,
(self._compute_fitness_score(seed), -len(seed.splitlines()), seed),
)保证种子池中种子数小于self.seed_pool_size
_select_seed:
以softmax过的scores为概率,随机选取
1
2
3
4
5
6
7
8
9
10def _softmax(self, x):
e_x = np.exp(x - np.max(x))
return e_x / e_x.sum()
def _select_seed(self):
_seed_scores = [rec[0] for rec in self.seeds]
codes = [rec[-1] for rec in self.seeds]
probs = self._softmax(_seed_scores)
code = np.random.choice(codes, p=probs)
return code_update_seed:只更新info
1
2def _update_seed(self, code, value):
self.info_code[code]["used_as_seed"] += self.num_generated_add_new_seed:多了一步给种子打分,push进小根堆,并pop出低分种子直到种子池满
1
2
3
4
5
6
7heapq.heappush(
self.seeds,
(self._compute_fitness_score(code), -len(code.splitlines()), code),
)
if self.seed_pool_size > 0:
while len(self.seeds) > self.seed_pool_size:
heapq.heappop(self.seeds)
instrumentor.py
python的ast树默认遍历方式为先根遍历,以下是一个ast树的例子
1 |
|
class SnippetInfill
继承自ast.NodeTransformer
visit:需要单独处理ast.Call
visit_Call:
获取所有api调用,计数self.num_replaced
如果self.replace为False则只是返回
如果self.replace_type包含argument,则设置args为mask并清除所有keywords
1
2
3
4
5if "argument" in self.replace_type:
node.args = [ast.Constant(value=self.mask_identifier)]
if "keywords" in dir(node):
node.keywords = []
self.replace = False # do not replace any later api calls计数最后一个目标api出现的行数到self.line_no
add_infill:
- 先parse再unparse,可以去掉多余的换行
- 先visit一次计数self.num_replaced
- 再visit一次进行更改
- 分类self.replace_type:
- argument:mask所有目标api的args
- prefix:去掉self.line_no前的若干行,并补充import
- suffix:去掉self.line_no后的若干行
- prefix-argument:综合两者
- suffix-argument:综合两者
- 返回:
- mask计数:self.num_replaced
- maks完的code:infill_code
- 原始code:original_code
class SnippetInfillArbitratyAPI
类似SnippetInfill
visit_Call:收集所有包括在self.full_api_list中的api call结点,加入self.call_nodes
add_infill:
从self.call_nodes随机选择一个结点mask:
替换方法:mask Attribute
1
2
3
4replace_node.func = ast.Attribute(
value=ast.Name(id=self.lib_prefix),
attr=self.mask_identifier.format(0),
)1
torch.<|mask:0|>(A, B)
替换argument:mask args,清空keywords
1
2
3
4
5replace_node.args = [
ast.Constant(value=self.mask_identifier.format(0))
]
if "keywords" in dir(replace_node):
replace_node.keywords = []1
torch.mm(<|mask:0|>)
替换keywords:添加一个keyword
1
2
3
4
5
6replace_node.keywords.append(
ast.keyword(
arg=self.mask_identifier.format(0),
value=ast.Constant(value=self.mask_identifier.format(1)),
)
)1
torch.mm(A, B, <|mask:0|> = <|mask:1|>)
class DepthFinder
继承自ast.NodeTransformer
depth计算方式:
没有参数,只有api调用,则depth为api层数
1
a = lib.getnum()
例:a的depth是1
有参数,则depth为max(参数的depth+api层数)
1
b = lib.add(lib.sub(a, 1), 0)
例:b的depth是2+1=3
相关成员含义:
- api_depth:目标库api调用层数
- current_var:被赋值的变量,第一次出现时是待计算的depth,需要忽略
一个例子:
1 |
|
max_depth=5
class SearchAllCall
继承自ast.NodeTransformer
self.call_nodes获取所有函数调用结点
class SearchAllLibCall
继承自SearchAllCall
忽略一些函数
1 |
|
class UniqueFinder
call_exp是api调用的完整表达,如
1
lib.call(test = 1, 2)
api_call只是api,如:
1
lib.call
unique_calls是api种类数
ex_repeats是api重复出现的次数
1
2
3
4if call_exp not in self.unique_found_call_exps:
self.unique_found_call_exps[call_exp] = 0
# ……
self.unique_found_call_exps[call_exp] += 11
sum([v - 1 for _, v in self.unique_found_call_exps.items()])
class UseDef
继承自ast.NodeTransformer
get_use_def:
- visit只处理name
- 记录Load到self.uses
- 记录Store到self.defs
uses和defs是两个字典,key是行号value是id的集合,返回uses、defs和ids,虽然ids没用,樂
uil.py
run_cmd
使用subprocess.run执行命令,输出returncode、stderr和stdout,程序结束状态和错误提示信息
输出:
1
2
3
4
5
6
7
8
9
10
11# verbose
output.returncode: {}
stdout> {}
stderr> {}
# verbose
# error_msg
---- returncode={} ----
stdout> {}
stderr> {}
# error_msgExecutionStatus
- CRASH
- SIGABRT Triggered
- SIGILL
- SIGTRAP
- SIGFPE
- OOM
- SIGBUS Triggered
- Segmentation Fault Triggered
- EXCEPTION
- SUCCESS
- CRASH
astpasses.py
class PassRemoveDocstring
继承了NodeTransformerWithPrePost又不用继承了个寂寞脑壳有点问题神金我看了半天!!!
Module里的Docstring就是body里一个只有Const的Expr,删了就行
1 |
|
clean_code.py
_dead_code_elim
- 以行号为结点,行内包含的ids的def-use为边建立邻接矩阵
- 保留所有没有def或use的行
- 跳过先use再def的行
- dfs更新所有目标行可达的行号successor,保留
- dfs更新所有可达successor的行号,保留
dead_code_elim
使用UseDef获取defs、uses和ids
UseDef见util/instrumentor.py
使用SearchCall获取call目标api的所有结点
SearchCall见util/astpasses.py
获取目标api出现的最小行号为target_line
调用_dead_code_elim删除不能到达目标行的行(返回保留行)
保留缩进行
去除注释
返回删除死代码后的代码