掌握Hadoop DFS常用指令

1. Hadoop 使用方法

在服务器上,我们通过Linux 指令对本地文件系统进行操作,如使用 ls 查看文件/目录信息、使用 cp进行文件复制、使用 cat 查看文件内容。在分布式文件系统中,也有一套相似的指令,接下来我们需要掌握一些基本的指令。(本题 1 分)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
xxxxxxxxxx@thumm01:~$ hadoop fs -ls /
Found 2 items
drwxr-xr-x - root supergroup 0 2023-10-06 20:48 /dsjxtjc
drwxrwxrwx - jtliu supergroup 0 2020-12-21 23:25 /tmp
xxxxxxxxxx@thumm01:~$ hadoop fs -ls /dsjxtjc/xxxxxxxxxx
xxxxxxxxxx@thumm01:~$ touch test.txt
xxxxxxxxxx@thumm01:~$ echo "Hello Hadoop" > test.txt
xxxxxxxxxx@thumm01:~$ cat test.txt
Hello Hadoop
xxxxxxxxxx@thumm01:~$ hadoop fs -copyFromLocal ./test.txt /dsjxtjc/xxxxxxxxxx/
2023-10-31 20:31:31,606 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
xxxxxxxxxx@thumm01:~$ hadoop fs -cat /dsjxtjc/xxxxxxxxxx/test.txt
2023-10-31 20:32:01,519 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Hello Hadoop

2. 通过Web查看Hadoop运行情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Windows PowerShell
版权所有(C) Microsoft Corporation。保留所有权利。

安装最新的 PowerShell,了解新功能和改进!https://aka.ms/PSWindows

PS C:\Users\jkup> ssh xxxxxxxxxx@10.103.9.11 -L 9870:192.169.0.101:9870
Welcome to Ubuntu 16.04.7 LTS (GNU/Linux 4.4.0-210-generic x86_64)

* Documentation: https://help.ubuntu.com
* Management: https://landscape.canonical.com
* Support: https://ubuntu.com/advantage

UA Infra: Extended Security Maintenance (ESM) is not enabled.

0 updates can be applied immediately.

307 additional security updates can be applied with UA Infra: ESM
Learn more about enabling UA Infra: ESM service for Ubuntu 16.04 at
https://ubuntu.com/16-04

Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.

New release '18.04.6 LTS' available.
Run 'do-release-upgrade' to upgrade to it.


Last login: Tue Oct 31 19:57:30 2023 from 10.81.32.116


xxxxxxxxxx@thumm01:~$

分布式文件系统

copyFromLocal

copyFromLocal 的功能是将本地文件传到DFS之中。具体来说, client 会把文件信息通过new_fat_item 指令给NameNode,NameNode根据文件大小分配空间,并将相应空间信息以FAT表的形式返回给 client.py (详见 name_node.py 中的 new_fat_item 函数);接着,Client 根据FAT表和目标节点逐个建立连接发送数据块。

name_node.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def new_fat_item(self, dfs_path, file_size):
nb_blks = int(math.ceil(file_size / DFS_BLK_SIZE))
print(file_size, nb_blks)

# 如果dfs_replication为复数时可以新增host_name的数目
data_pd = pd.DataFrame(columns=['blk_no', 'host_name', 'blk_size'])

for i in range(nb_blks):
blk_no = i
host_name = np.random.choice(HOST_LIST, size=DFS_REPLICATION, replace=False)
blk_size = min(DFS_BLK_SIZE, file_size - i * DFS_BLK_SIZE)
data_pd.loc[i] = [blk_no, host_name, blk_size]

# 获取本地路径
local_path = os.path.join(NAME_NODE_DIR, dfs_path)
# 若目录不存在则创建新目录
os.system("mkdir -p {}".format(os.path.dirname(local_path)))
# 保存FAT表为CSV文件
data_pd.to_csv(local_path, index=False)
# 同时返回CSV内容到请求节点
return data_pd.to_csv(index=False)

data_node.py

1
2
3
4
5
6
7
8
9
10
11
12
def store(self, sock_fd, dfs_path):
# 从Client获取块数据
chunk_data = sock_fd.recv(BUF_SIZE)
# 本地路径
local_path = os.path.join(DATA_NODE_DIR, dfs_path)
# 若目录不存在则创建新目录
os.system("mkdir -p {}".format(os.path.dirname(local_path)))
# 将数据块写入本地文件
with open(local_path, "wb") as f:
f.write(chunk_data)

return "Store chunk {} successfully~".format(local_path)

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def copyFromLocal(self, local_path, dfs_path):
file_size = os.path.getsize(local_path)
print("File size: {}".format(file_size))

request = "new_fat_item {} {}".format(dfs_path, file_size)
print("Request: {}".format(request))

# 从NameNode获取一张FAT表
self.name_node_sock.send(bytes(request, encoding='utf-8'))
fat_pd = self.name_node_sock.recv(BUF_SIZE)

# 打印FAT表,并使用pandas读取
fat_pd = str(fat_pd, encoding='utf-8')
print("Fat: \n{}".format(fat_pd))
fat = pd.read_csv(StringIO(fat_pd))

# 根据FAT表逐个向目标DataNode发送数据块
with open(local_path) as fp:
for idx, row in fat.iterrows():
data = fp.read(int(row['blk_size']))
datanode_hosts = re.findall(r"'(.+?)'", row['host_name']) # 正则表达式
for host in datanode_hosts:
data_node_sock = socket.socket()
data_node_sock.connect((host, DATA_NODE_PORT))
blk_path = dfs_path + ".blk{}".format(row['blk_no'])

request = "store {}".format(blk_path)
data_node_sock.send(bytes(request, encoding='utf-8'))
time.sleep(0.2) # 两次传输需要间隔一段时间,避免粘包
data_node_sock.send(bytes(data, encoding='utf-8'))
data_node_sock.close()

扩展:

1.TCP粘包问题
原因:在TCP的socket编程中,发送端和接收端都有成对的socket。发送端为了将多个发往接收端的包,更加高效的的发给接收端,于是采用了优化算法(Nagle算法),将多次间隔较小、数据量较小的数据,合并成一个数据量大的数据块,然后进行封包。出现粘包的原因是多方面的,可能是来自发送方,也可能是来自接收方。
解决方式:

发送方两次发送间隔一段时间(低性能),或发送方关闭Nagle算法

接受方简单的处理思路如下,随着需求演化,逐渐形成了应用层协议:

  1. 在数据流中设置分段标志,比如C语言字符串使用’\n’作为字符串结束标志,但这又涉及到一个转义和不安全的问题。
  2. 设置收发格式,比如规定前多少字节表示packet的大小,这也是后面实验中使用的方法。

只有TCP存在粘包问题,因为TCP是面向流的,而UDP是面向消息的,有消息边界。

2.StringIO

io 模块中的 io.StringIO 能生成内存中的类文件对象(in-memory file-like object),此对象可用作需要标准文件对象的大多数函数的输入或输出。

3.正则表达式

datanode_hosts = re.findall(r"'(.+?)'", row['host_name'])

  • row['host_name']是一个string,值为"['thumm01' 'thumm03' 'thumm05']"

  • pattern中的r表示字符串,还可以用b表示字节。

  • . 表示匹配除“\n”之外的任何单个字符。要匹配包括“\n”在内的任何字符,请使用像“(.|\n)”的模式。

  • + 表示匹配前面的子表达式一次或多次。例如,“zo+”能匹配“zo”以及“zoo”,但不能匹配“z”。+等价于{1,}。

  • ? 表示非贪婪模式。当该字符紧跟在任何一个其他限制符(*,+,?,{n},{n,},{n,m})后面时,匹配模式是非贪婪的。非贪婪模式尽可能少的匹配所搜索的字符串,而默认的贪婪模式则尽可能多的匹配所搜索的字符串。例如,对于字符串“oooo”,“o+?”将匹配单个“o”,而“o+”将匹配所有“o

  • re.findall(r"'(.+?)'", row['host_name'] 结果为一个list,为['thumm01', 'thumm03', 'thumm05']

pipeline.sh

每次一个个去启动数据节点上的太麻烦了,还是写个通过ssh统一控制数据节点的脚本。

第10行的kill命令作用不大,数据节点上的后台进程删不干净(权限不够),这里采取的方法是修改端口重新运行,对不起啦(

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash
HOST_LIST=('thumm01' 'thumm03' 'thumm04' 'thumm05' "thumm06")

for server in ${HOST_LIST[@]}
do
ssh $server "
rm -rf ~/lab2/MyDFS/dfs/data/*;
scp -r thumm01:~/lab2/MyDFS ~/lab2/
cd ~/lab2/MyDFS;
# kill -9 \$(ps -ef | grep MyDFS | awk '{print \$2}');
python3 data_node.py;
" &
done

wait

如果第11行用python通过bash pipeline.sh 运行时可能得报错: SyntaxError: Non-ASCII character '\xe6' in file data_node.py on line 7, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details,还会遇到 str() takes at most 1 argument (2 given) 的报错。
这是因为thumm05和thumm06默认使python2。为了避免这种问题,可以指定python3解释所写程序。

1
2
3
4
xxxxxxxxxx@thumm05:~$ which python
/usr/bin/python
xxxxxxxxxx@thumm05:~$ ls -al /usr/bin/python
lrwxrwxrwx 1 root root 9 Nov 23 2017 /usr/bin/python -> python2.7

运行

这里使用的已经是添加数据切边冗余备份的代码,各数据节点上的数据切片详见data replication一节。

1
2
3
4
5
6
7
8
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -copyFromLocal ./test/test_copyFromLocal.txt test_copyFromLocal.txt
File size: 8411
Request: new_fat_item test_copyFromLocal.txt 8411
Fat:
blk_no,host_name,blk_size
0,['thumm05' 'thumm01' 'thumm06'],4096
1,['thumm03' 'thumm01' 'thumm06'],4096
2,['thumm05' 'thumm01' 'thumm06'],219

copyToLocal

copyToLocal 是 copyFromLocal 的反向操作

name_node.py

1
2
3
4
5
def get_fat_item(self, dfs_path):
# 获取FAT表内容
local_path = os.path.join(NAME_NODE_DIR, dfs_path)
response = pd.read_csv(local_path)
return response.to_csv(index=False)

data_node.py

1
2
3
4
5
6
7
8
9
10
11
12
def store(self, sock_fd, dfs_path):
# 从Client获取块数据
chunk_data = sock_fd.recv(BUF_SIZE)
# 本地路径
local_path = os.path.join(DATA_NODE_DIR, dfs_path)
# 若目录不存在则创建新目录
os.system("mkdir -p {}".format(os.path.dirname(local_path)))
# 将数据块写入本地文件
with open(local_path, "wb") as f:
f.write(chunk_data)

return "Store chunk {} successfully~".format(local_path)

client.py

  • 从NameNode获取一张FAT表
  • 打印FAT表
  • 根据FAT表逐个从目标DataNode请求数据块,写入到本地文件中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def copyToLocal(self, dfs_path, local_path):
request = "get_fat_item {}".format(dfs_path)
print("Request: {}".format(request))
# 从NameNode获取一张FAT表;打印FAT表;根据FAT表逐个从目标DataNode请求数据块,写入到本地文件中

self.name_node_sock.send(bytes(request, encoding='utf-8'))
fat_pd = self.name_node_sock.recv(BUF_SIZE)
fat_pd = str(fat_pd, encoding='utf-8')
print("Fat: \n{}".format(fat_pd))
fat = pd.read_csv(StringIO(fat_pd))

with open(local_path, "w") as fp:
for idx, row in fat.iterrows():
datanode_hosts = re.findall(r"'(.+?)'", row['host_name'])

data_node_sock = socket.socket()
data_node_sock.connect((datanode_hosts[0], DATA_NODE_PORT))
blk_path = dfs_path + '.blk{}'.format(row['blk_no'])

request = "load {}".format(blk_path)
data_node_sock.send(bytes(request, encoding='utf-8'))
time.sleep(0.2)
recv_data = data_node_sock.recv(BUF_SIZE)
data_node_sock.close()
fp.write(str(recv_data, encoding='utf-8'))

运行

1
2
3
4
5
6
7
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -copyToLocal test_copyFromLocal.txt ./test/test_copyToLocal.txt
Request: get_fat_item test_copyFromLocal.txt
Fat:
blk_no,host_name,blk_size
0,['thumm05' 'thumm01' 'thumm06'],4096
1,['thumm03' 'thumm01' 'thumm06'],4096
2,['thumm05' 'thumm01' 'thumm06'],219

ls

name_node.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def ls(self, dfs_path):
local_path = os.path.join(NAME_NODE_DIR, dfs_path)
# 如果不存在,返回错误信息
if not os.path.exists(local_path):
return "No such file or directory: {}".format(dfs_path)

if os.path.isdir(local_path):
# 如果目标地址是一个文件夹,则显示该文件夹下内容
dirs = os.listdir(local_path)
response = " ".join(dirs)
else:
# 如果目标是文件则显示文件的FAT表信息
with open(local_path) as f:
response = f.read()
return response

client.py

Client 会向NameNode 发送请求,查看 dfs_path 下的文件或文件夹信息

  • 向NameNode发送请求,查看dfs_path下文件或者文件夹信息
1
2
3
4
5
6
7
def ls(self, dfs_path):
# 向NameNode发送请求,查看dfs_path下文件或者文件夹信息
request = "ls {} ".format(dfs_path)
self.name_node_sock.send(bytes(request, encoding='utf-8'))
fat_pd = self.name_node_sock.recv(BUF_SIZE)
fat_pd = str(fat_pd, encoding='utf-8')
print("file: \n{}".format(fat_pd))

运行

向dfs中test_ls文件夹中上传2文件后,使用ls测试:

  • 查看文件夹
  • 查看存在的文件
  • 查看不存在的文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -format
format
Format namenode successfully~
Format datanode successfully~
Format datanode successfully~
Format datanode successfully~
Format datanode successfully~
Format datanode successfully~
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -copyFromLocal ./test/test_copyToLocal.txt test_ls/test_ls0.txt
File size: 8411
Request: new_fat_item test_ls/test_ls0.txt 8411
Fat:
blk_no,host_name,blk_size
0,['thumm01' 'thumm03' 'thumm05'],4096
1,['thumm03' 'thumm01' 'thumm05'],4096
2,['thumm04' 'thumm01' 'thumm06'],219

xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -copyFromLocal ./test/test_copyToLocal.txt test_ls/test_ls0.txt
File size: 8411
Request: new_fat_item test_ls/test_ls0.txt 8411
Fat:
blk_no,host_name,blk_size
0,['thumm04' 'thumm01' 'thumm06'],4096
1,['thumm06' 'thumm05' 'thumm04'],4096
2,['thumm05' 'thumm06' 'thumm01'],219

xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -ls test_ls
file:
test_ls0.txt
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -ls NoExisted.txt
file:
No such file or directory: NoExisted.txt
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -ls dir/test_ls0.txt
file:
No such file or directory: dir/test_ls0.txt

rm

rm: 删除相应路径的文件。

name_node.py

1
2
3
4
5
def rm_fat_item(self, dfs_path):
local_path = NAME_NODE_DIR + dfs_path
response = pd.read_csv(local_path)
os.remove(local_path)
return response.to_csv(index=False)

data_node.py

1
2
3
4
def rm(self, dfs_path):
local_path = os.path.join(DATA_NODE_DIR, dfs_path)
rm_command = "rm -rf " + local_path
os.system(rm_command)

client.py

  • 从NameNode获取改文件的FAT表,获取后NameNode删除对应记录
  • 打印FAT表
  • 根据FAT表逐个告诉目标DataNode删除对应数据块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def rm(self, dfs_path):
request = "rm_fat_item {}".format(dfs_path)
print("Request: {}".format(request))
# 从NameNode获取改文件的FAT表,获取后删除;打印FAT表;根据FAT表逐个告诉目标DataNode删除对应数据块
self.name_node_sock.send(bytes(request, encoding='utf-8'))
fat_pd = self.name_node_sock.recv(BUF_SIZE)
fat_pd = str(fat_pd, encoding='utf-8')
# fat_pd = str(self.name_node_sock.recv(BUF_SIZE), encoding='utf-8')
print("Fat: \n{}".format(fat_pd))
fat = pd.read_csv(StringIO(fat_pd))

for idx, row in fat.iterrows():
data_node_sock = socket.socket()
data_node_sock.connect((row['host_name'], DATA_NODE_PORT))
blk_path = dfs_path + '.blk{}'.format(row['blk_no'])
request = "rm {}".format(blk_path)
data_node_sock.send(bytes(request, encoding='utf-8'))
recv_data = data_node_sock.recv(BUF_SIZE)
print(str(recv_data, encoding='utf-8'))
data_node_sock.close()

运行

执行结果如下,在./dfs/name和./dfs/data中的文件被删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -copyFromLocal ./test/test_copyToLocal.txt test_rm/test_rm0.txt
File size: 8411
Request: new_fat_item test_rm/test_rm0.txt 8411
Fat:
blk_no,host_name,blk_size
0,['thumm03' 'thumm06' 'thumm05'],4096
1,['thumm01' 'thumm06' 'thumm05'],4096
2,['thumm01' 'thumm05' 'thumm04'],219

xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -rm test_rm/test_rm0.txt
Request: rm_fat_item test_rm/test_rm0.txt
Fat:
blk_no,host_name,blk_size
0,['thumm03' 'thumm06' 'thumm05'],4096
1,['thumm01' 'thumm06' 'thumm05'],4096
2,['thumm01' 'thumm05' 'thumm04'],219

Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk0 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk0 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk0 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk1 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk1 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk1 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk2 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk2 successfully~
Remove chunk ./dfs/data/test_rm/test_rm0.txt.blk2 successfully~
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ ls dfs/data/test_rm/
xxxxxxxxxx@thumm01:~/lab2/MyDFS$

data replication

目前common.py中DFS_REPLICATION为1,意为每个数据块只存储在一台主机上。实际上从系统稳定性考虑,每个数据块会被存放在多台主机。请修改DFS_REPLICATION和HOST_LIST,以及namenode.pydatanode.py、client.py中对应的部分,实现多副本块存储。推荐dfs_replicatio=3 ,在5台机器上测试。

以上代码和运行结果已经使用了 data replication

1
python client.py -copyFromLocal ./test/test_copyToLocal.txt test_replication/0.txt

如下图所示,通过vscode文本框可以看到thum01中有1和2块,通过tmux中左上分窗可以看到thumm03上有0和2号块……且其他节点上确实有对应块,且块中内容均正确。

MapReduce 实现稀疏矩阵乘法

数据”断裂“问题

在前面的程序中,client在上传文件时会向name_node发文件总大小,name_node再按照固定字节数的方式进行分块,这样带来了一个巨大的问题:
按照字节划分可以导致数据”断裂“问题,分块交接处的数据分块可能被分成2块。

由于name_node无法访问原文件,因此更改方法只能是client进行预分块,client保证每块大小尽可能接近DFS_BLK_SIZE且每个分块中数据都是完整的,并把各个分块的大小上传给name_node。

但如果某个数据特别大,超过DFS_BLK_SIZE,那么上面的处理会导致没有该数据不能放进DFS(死循环)。
虽说设计分布式系统时应考虑所有的情况,但是这里还是偷个懒了(

client 计算分块大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def preCalculateFileSize(self, local_file):
blk_size_list = []
cur_blk_size = 0
with open(local_file, "rb") as f:
# 字节模式去读, 字符模式下len(™)=2, 但占4字节
for line in f.readlines():
if (cur_blk_size + len(line)) > DFS_BLK_SIZE:
blk_size_list.append(cur_blk_size)
cur_blk_size = len(line)
else:
cur_blk_size += len(line)
if cur_blk_size:
blk_size_list.append(cur_blk_size)
return blk_size_list

问题记录:按字节读取和按字符读取

问题:os.path.getsize 和 按行读取字符这总数不相等。
注意:读取的每行line是包含行末换行符\n的,在len()时也会被计入长度,本人测试是占一个字节。

1
2
3
4
5
6
7
8
9
10
>>> os.path.getsize("test.txt")
8411
>>> s = 0
>>> f = open("test.txt", "r")
>>> for line in f.readlines():
... s += len(line)
...
...
>>> s
8399

经过测试换行符\n和缩进、空格都不会引发长度计数的问题,那究竟是什么导致了长度不相等的问题呢?

原因在于非ASCII字符,比如™在len()时长度为2,而在字节模式中占4个字符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
>>> f1 = open("test.txt")
>>> f2 = open("test.txt", "rb")
>>> ls1 = f1.readlines()
>>> ls2 = f2.readlines()
>>> for i in range(len(ls1)):
... if (len(ls1[i]) != len(ls2[i])):
... print(f"len(ls1[i]) = {len(ls1[i])} len(ls2[i]) = {len(ls2[i])}")
... print(ls1[i], end='')
... print(ls2[i])
...
...
...
len(ls1[i]) = 107 len(ls2[i]) = 109
Although the Hadoop framework is implemented in Java™, MapReduce applications need not be written in Java.
b'Although the Hadoop framework is implemented in Java\xe2\x84\xa2, MapReduce applications need not be written in Java.\n'
len(ls1[i]) = 96 len(ls2[i]) = 98
Hadoop Pipes is a SWIG-compatible C++ API to implement MapReduce applications (non JNI™ based).
b'Hadoop Pipes is a SWIG-compatible C++ API to implement MapReduce applications (non JNI\xe2\x84\xa2 based).\n'
len(ls1[i]) = 94 len(ls2[i]) = 98
Here, myarchive.zip will be placed and unzipped into a directory by the name “myarchive.zip”.
b'Here, myarchive.zip will be placed and unzipped into a directory by the name \xe2\x80\x9cmyarchive.zip\xe2\x80\x9d.\n'
len(ls1[i]) = 219 len(ls2[i]) = 223
Here, the files dir1/dict.txt and dir2/dict.txt can be accessed by tasks using the symbolic names dict1 and dict2 respectively. The archive mytar
.tgz will be placed and unarchived into a directory by the name “tgzdir”.
b'Here, the files dir1/dict.txt and dir2/dict.txt can be accessed by tasks using the symbolic names dict1 and dict2 respectively. The archive myt
ar.tgz will be placed and unarchived into a directory by the name \xe2\x80\x9ctgzdir\xe2\x80\x9d.\n

总结:以后遇到这种操作都应该采用字节模式读取。

conn_helper.py:packet设计

如上一节,传输对象不限于字符串,还涉及到了列表等对象,此外由于发送方和接收方的缓冲区有限,有必要设计单独的通信工具函数。

网上很多方法在传输大文件时会遇到问题,会报错:EOFError: Ran out of input。如下图所示,发送方发送的数据长度是565550,接收方时而能接受到完整数据,比如右下方终端窗中上一次运行中恢复出了recv_data;时而失败,比如右下终端窗中这一次运行失败,只接收到了43776字节。

比较神奇的是每次运行失败都是接受到固定字节数的数据,test_client.py中是43776字节,client.py中是14480字节,本人求助谷歌后没有获得有效的线索。
由于之前网络学的太基础了,不会分析TCP连接,以后好好学网络:)

这里采取的解决方法是在使用json.dumps对object进行序列化后,在前面使用4字节的字段保存长度。

为了偷懒,程序中只用涉及传输列表、字典等高级对象时才使用 send_objreceive_obj, 传输字符串部分的代码不改了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import struct
import json

def send_msg(conn, msg):
conn.sendall(struct.pack('I', len(msg)) + msg)

def send_obj(conn, msg):
send_msg(conn, json.dumps(msg).encode())

def receive_msg(conn):
msg_len = struct.unpack('I', conn.recv(4))[0]
ret = b''
while len(ret) < msg_len:
msg = conn.recv(msg_len - len(ret))
if msg is None:
return None
ret = ret + msg
return ret

def receive_obj(conn):
return json.loads(receive_msg(conn).decode())

client.py 发布Map任务,执行Reduce任务

MapReduce编程模型中Map和Reduce的抽象输入输出如下所示:

1
2
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)

client 在 MapReduce 函数中实现发布Map任务,执行Reduce任务的功能,可分为以下几步

  1. CopyFromLocal
    需要在client处预计算各分块大小,并上传给 name_node 得到FAT表
  2. 分配Map任务, DataNode作为Mapper生成list(k: v)并返回
    但这里Data返回的其实直接是 k : list(v),每个v是通过一个list存储如[‘a’, j, val]
  3. 执行Reduce
    client只要把来自各个data_node的 k : list(v) 合并,即把相同key的值组合到一起,就能开始执行Reduce,因此出于简便考虑,不分发Reduce任务而在client处执行Reduce即可。
  4. 整理得到输出
    任务要求按照key1 = i, key2 = j的数值序排序输出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def MapReduce(self, m, p, n, input_file, output_file):
# 1. CopyFromLocal
blk_size_list = self.preCalculateFileSize(input_file)
request = "new_fat_item {} ".format(input_file)
print("Request: {}".format(request))
# 从NameNode获取一张FAT表
self.name_node_sock.send(bytes(request, encoding='utf-8'))
time.sleep(0.2)
send_obj(self.name_node_sock, blk_size_list)
fat_pd = self.name_node_sock.recv(BUF_SIZE)
# 打印FAT表,并使用pandas读取
fat_pd = str(fat_pd, encoding='utf-8')
print("Fat: \n{}".format(fat_pd))
fat = pd.read_csv(StringIO(fat_pd))
# 根据FAT表逐个向目标DataNode发送数据块
fp = open(input_file)
for idx, row in fat.iterrows():
data = fp.read(int(row['blk_size']))
datanode_hosts = re.findall(r"'(.+?)'", row['host_name']) # 正则表达式
for host in datanode_hosts:
data_node_sock = socket.socket()
data_node_sock.connect((host, DATA_NODE_PORT))
blk_path = input_file + ".blk{}".format(row['blk_no'])
request = "store {}".format(blk_path)
data_node_sock.send(bytes(request, encoding='utf-8'))
time.sleep(0.2) # 两次传输需要间隔一段时间,避免粘包
data_node_sock.send(bytes(data, encoding='utf-8'))
data_node_sock.close()
fp.close()
print("Finish CopyFromLocal")

# 2. 分配Map任务, DataNode作为Mapper生成list(k: v),但返回的其实直接是 k:list(v)
# 根据FAT表分配Map任务
key_valuelist = dict()
fat = pd.read_csv(StringIO(fat_pd))
for idx, row in fat.iterrows():
datanode_hosts = re.findall(r"'(.+?)'", row['host_name'])
data_node_sock = socket.socket()
data_node_sock.connect((datanode_hosts[0], DATA_NODE_PORT))
blk_path = input_file + ".blk{}".format(row['blk_no'])
request = "map {} {} {}".format(blk_path, m, n)
data_node_sock.send(bytes(request, encoding='utf-8'))
key_values = receive_obj(data_node_sock)
for k in key_values.keys():
if k not in key_valuelist.keys():
key_valuelist[k] = key_values[k]
else:
key_valuelist[k] += key_values[k]
print("Finish Reduce")

# 3. 执行Reduce
ij_matrix = dict()
for key, valuelist in key_valuelist.items():
a_idx_val = {}
b_idx_val = {}
for mat_idx_val in valuelist:
mat, idx, val = mat_idx_val[0], mat_idx_val[1], mat_idx_val[2]
if (mat == 'a'):
a_idx_val[idx] = val
else:
b_idx_val[idx] = val
res_element_val = 0
for idx in a_idx_val.keys():
if idx in b_idx_val.keys():
res_element_val += a_idx_val[idx] * b_idx_val[idx]

if res_element_val != 0:
ij_matrix[key] = res_element_val

# 4. 整理得到输出
with open(output_file, "w") as of:
# 按照key1=i, key2=j的数值序进行排序
for ij in sorted(ij_matrix, key=lambda ij: [int(x) for x in ij.split(',')]):
of.write("{},{}\n".format(ij, ij_matrix[ij]))
print("Finish writing to output file.")

  1. 将来自不同Mapper的 k : list(v) 合并时,合并的是list,使用的是 +=。

  2. 按键数值排序规则可以这样写:for ij in sorted(ij_matrix, key=lambda ij: [int(x) for x in ij.split(',')])

data_node.py中map函数

dict 不支持相同key不同value的数据,由于我们的最终的目的是要得到 k : list(v),这里干脆不先生成list(k : v),直接生成 k : list(v) 好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def map(self, dfs_path, m, n):
local_path = os.path.join(DATA_NODE_DIR, dfs_path)
key_valuelist = dict()
for line in open(local_path):
line_4parts = line.split(',')
mat, i, j, val = line_4parts[0], int(line_4parts[1]), int(line_4parts[2]), int(line_4parts[3])
if mat == 'A':
for k in range(1, n + 1):
str_i_k = "{},{}".format(i, k)
if str_i_k not in key_valuelist.keys():
key_valuelist[str_i_k] = [['a', j, val]]
else:
key_valuelist[str_i_k].append(['a', j, val])
elif mat == 'B':
for k in range(1, m + 1):
str_k_j = "{},{}".format(k, j)
if str_k_j not in key_valuelist.keys():
key_valuelist[str_k_j] = [['b', i, val]]
else:
key_valuelist[str_k_j].append(['b', i, val])

return key_valuelist
  1. list 类型的[i, k]不可哈希,可以将其格式化成字符串作为key。
  2. 生成k : list(v) 时,当遇到相同k时,将v纳入结果list时使用的操作是append。

测试验证

Example上的验证

1
2
3
4
5
6
7
8
9
10
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -MapReduce test/input_4_3_2_.txt test/output_4_3_2_.txt
4 test/output_4_3_2_.txt
Request: new_fat_item test/input_4_3_2_.txt
Fat:
blk_no,host_name,blk_size
0,['thumm01' 'thumm03' 'thumm05'],133

Finish CopyFromLocal
Finish Reduce
Finish writing to output file.

随机数组上的验证

generate_matrix.py: 生成随机数组并以稀疏矩形式保存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import numpy as np

def reslove_as_sparse_matrix(which_mat, mat, f):
print(which_mat, mat.shape[0], mat.shape[1])
n1, n2 = mat.shape[0], mat.shape[1]
for i in range(n1):
for j in range(n2):
if (mat[i][j] != 0):
f.write("{},{},{},{}\n".format(which_mat, i + 1, j + 1, mat[i][j]))

m_p_n = np.random.randint(low=100, high=200, size=3)
print(m_p_n)
m, p, n = m_p_n[0], m_p_n[1], m_p_n[2]
with open("test/input_{}_{}_{}_.txt".format(m, p, n), "w") as f:
mask_A = np.where(np.random.rand(m, p) < 0.1, 1, 0)
mat_A = np.random.randint(-0x3f3f, 0x3f3f, (m, p))
mat_A = mask_A * mat_A
reslove_as_sparse_matrix('A', mat_A, f)

mask_B = np.where(np.random.rand(p, n) < 0.1, 1, 0)
mat_B = np.random.randint(-0x3f3f, 0x3f3f, (p, n))
mat_B = mask_B * mat_B
reslove_as_sparse_matrix('B', mat_B, f)
process_on_single_node.py: 单机矩阵乘法验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import sys
import numpy as np

argv = sys.argv
argc = len(argv) - 1

input_file = argv[1]
input_file_parts = input_file.split('_')
m, p, n = int(input_file_parts[1]), int(input_file_parts[2]), int(input_file_parts[3])
mat_A, mat_B = np.zeros((m, p), dtype=int), np.zeros((p, n), dtype=int)
# Resolve input into numpy
for line in open(input_file):
line_4_parts = line.split(',')
which_mat, i, j, val = line_4_parts[0], int(line_4_parts[1]), int(line_4_parts[2]), int(line_4_parts[3])
if which_mat == 'A':
mat_A[i - 1][j - 1] = val
elif which_mat == 'B':
mat_B[i - 1][j - 1] = val


mat_C = mat_A.dot(mat_B)
n1, n2 = mat_C.shape[0], mat_C.shape[1]
# Reslove output into file
with open("test/output_single_{}_{}_{}_.txt".format(m, p, n), "w") as f:
for i in range(n1):
for j in range(n2):
if (mat_C[i][j] != 0):
f.write("{},{},{}\n".format(i + 1, j + 1, mat_C[i][j]))

对比
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python client.py -MapReduce test/input_111_182_134_.txt test/output_111_182_134_.txt
165 test/output_165_147_144_.txt
Request: new_fat_item test/input_165_147_144_.txt
Fat:
blk_no,host_name,blk_size
0,['thumm06' 'thumm05' 'thumm04'],4089
1,['thumm06' 'thumm01' 'thumm05'],4085
2,['thumm05' 'thumm03' 'thumm04'],4092
3,['thumm05' 'thumm06' 'thumm04'],4089
4,['thumm03' 'thumm04' 'thumm01'],4095
5,['thumm05' 'thumm03' 'thumm04'],4087
6,['thumm05' 'thumm06' 'thumm03'],4096
7,['thumm01' 'thumm03' 'thumm06'],4092
8,['thumm01' 'thumm04' 'thumm03'],4091
9,['thumm04' 'thumm06' 'thumm01'],4087
10,['thumm06' 'thumm01' 'thumm04'],4091
11,['thumm05' 'thumm03' 'thumm04'],4085
12,['thumm05' 'thumm06' 'thumm01'],4085
13,['thumm01' 'thumm04' 'thumm06'],4091
14,['thumm01' 'thumm04' 'thumm06'],4096
15,['thumm01' 'thumm06' 'thumm04'],3286

Finish CopyFromLocal
Finish Reduce
Finish writing to output file.


xxxxxxxxxx@thumm01:~/lab2/MyDFS$ python process_on_single_node.py test/input_111_182_134_.txt

如图所示,结果一致。

Bonus

Q:请对比一下单机处理矩阵乘法以及你自己设计的 mapreduce 矩阵乘法的时间性能,并解释为什么
会出现这样的情况

使用time对比时间性能如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ time python client.py -MapReduce test/input_111_182_134_.txt test/output_111_182_134_.txt
111 test/output_111_182_134_.txt
Request: new_fat_item test/input_111_182_134_.txt
Fat:
blk_no,host_name,blk_size
0,['thumm06' 'thumm01' 'thumm03'],4087
1,['thumm03' 'thumm05' 'thumm06'],4094
2,['thumm01' 'thumm05' 'thumm03'],4090
3,['thumm06' 'thumm04' 'thumm01'],4091
4,['thumm05' 'thumm04' 'thumm03'],4084
5,['thumm06' 'thumm05' 'thumm03'],4085
6,['thumm01' 'thumm04' 'thumm06'],4087
7,['thumm01' 'thumm04' 'thumm03'],4094
8,['thumm01' 'thumm03' 'thumm06'],4086
9,['thumm06' 'thumm03' 'thumm05'],4090
10,['thumm04' 'thumm03' 'thumm05'],4089
11,['thumm04' 'thumm05' 'thumm03'],4091
12,['thumm06' 'thumm01' 'thumm04'],4093
13,['thumm04' 'thumm06' 'thumm01'],4092
14,['thumm01' 'thumm06' 'thumm03'],4090
15,['thumm04' 'thumm05' 'thumm03'],2781

Finish CopyFromLocal
Finish Reduce
Finish writing to output file.

real 0m16.871s
user 0m2.780s
sys 0m0.988s
xxxxxxxxxx@thumm01:~/lab2/MyDFS$ time python process_on_single_node.py test/input_111_182_134_.txt

real 0m0.259s
user 0m0.692s
sys 0m0.768s

可以看到MapReduce几乎是单机程序的60倍左右,出现这种问题的原因是,虽然Map在多机上运行能通过并行提高效率,但是与name_node和data_node的网络通信才是性能瓶颈。

本实验中中MapReduce和单机程序还有许多可以优化的地方。
MapReduce 中Client分发Map任务时可以使用使用多进程或者多线程。
单机程序可以使用矩阵分块技术写出缓存友好的代码,可以使用多线程CPU和GP计算矩阵乘法。