帆子
作者帆子·2021-04-27 10:28
售前技术支持·国内某服务器生产商

在IBM i上实现作业之间的通讯与阻塞唤醒

字数 7125阅读 1661评论 0赞 0

先前介绍了 Unnamed Semaphore 在协调线程使用共享资源方面的作用,本文将结合实例,介绍一下如何在 IBM i 上通过 Data Queue 和 Named Samaphore 实现作业之间的通讯和阻塞唤醒。这其实也是在介绍 Named Semaphore (下文简称为 NS )在协调作业使用共享资源方面的作用。

该实例涉及两个作业,作业 A 和作业 B ,它们分别调用 SENDR 和 RECVR 这两个程序。在涉及程序代码之前,让我们先看一下它的流程图。(左边的是 SENDR 的流程图,右边的是 RECVR 的流程图)

下面我们就来看看具体的程序代码。

这是 SENDR 程序:

#include <stdio.h>
#include <qsnddtaq.h>
#include <semaphore.h>
#include <qp0z1170.h>

#define MSGLEN 30

static char dtaq[10];
static char qlib[10];
static decimal(5,0) leng=MSGLEN;

int clear_dtaq(void)
{
  char cmd[60];

  memset(cmd, 0x00, sizeof(cmd));
  sprintf(cmd, "CALL PGM(QCLRDTAQ) PARM('%10s' '%10s')", dtaq, qlib);

  if (Qp0zSystem(cmd)==0)
    return(0);
  else
    return(1);
}

int send_dtaq(char str[MSGLEN])
{
  QSNDDTAQ( dtaq,                 /* Data queue name          */
            qlib,                 /* Library name             */
            leng,                 /* Length of data           */
            str);                 /* Data                     */
  return 0;
}

main() {

  sem_t * sem_sendr;
  sem_t * sem_recvr;
  int rc;
  char str[MSGLEN];

  #define mysendr "/mysendr"
  #define myrecvr "/myrecvr"

  rc = sem_unlink(mysendr);
  rc = sem_unlink(myrecvr);

  sem_sendr = sem_open(mysendr,
                     O_CREAT, S_IRUSR | S_IWUSR, 0);
  sem_recvr = sem_open(myrecvr,
                     O_CREAT, S_IRUSR | S_IWUSR, 0);

  memset(str,  0x00, sizeof(str));
  memcpy(dtaq, "DTAQ01    ",  10);
  memcpy(qlib, "OLIVER    ",  10);

  rc = clear_dtaq();

  while(rc==0) {

    sem_wait(sem_recvr);
    gets(str);
    rc=send_dtaq(str);
    sem_post(sem_sendr);
    if(strcmp(str,".")==0) break;

  }

  rc = sem_unlink(mysendr);
  rc = sem_unlink(myrecvr);

  sem_close(sem_sendr);
  sem_close(sem_recvr);

}

这是 RECVR 程序:

#include <stdio.h>
#include <qsnddtaq.h>
#include <semaphore.h>
#include <qp0z1170.h>

#define MSGLEN 30

static char dtaq[10];
static char qlib[10];
static decimal(5,0) leng=MSGLEN;

int clear_dtaq(void)
{
  char cmd[60];

  memset(cmd, 0x00, sizeof(cmd));
  sprintf(cmd, "CALL PGM(QCLRDTAQ) PARM('%10s' '%10s')", dtaq, qlib);

  if (Qp0zSystem(cmd)==0)
    return(0);
  else
    return(1);
}

int send_dtaq(char str[MSGLEN])
{
  QSNDDTAQ( dtaq,                 /* Data queue name          */
            qlib,                 /* Library name             */
            leng,                 /* Length of data           */
            str);                 /* Data                     */
  return 0;
}

main() {

  sem_t * sem_sendr;
  sem_t * sem_recvr;
  int rc;
  char str[MSGLEN];

  #define mysendr "/mysendr"
  #define myrecvr "/myrecvr"

  rc = sem_unlink(mysendr);
  rc = sem_unlink(myrecvr);

  sem_sendr = sem_open(mysendr,
                     O_CREAT, S_IRUSR | S_IWUSR, 0);
  sem_recvr = sem_open(myrecvr,
                     O_CREAT, S_IRUSR | S_IWUSR, 0);

  memset(str,  0x00, sizeof(str));
  memcpy(dtaq, "DTAQ01    ",  10);
  memcpy(qlib, "OLIVER    ",  10);

  rc = clear_dtaq();

  while(rc==0) {

    sem_wait(sem_recvr);
    gets(str);
    rc=send_dtaq(str);
    sem_post(sem_sendr);
    if(strcmp(str,".")==0) break;

  }

  rc = sem_unlink(mysendr);
  rc = sem_unlink(myrecvr);

  sem_close(sem_sendr);
  sem_close(sem_recvr);

}

在介绍程序的功能之前,我们先将这两段程序中所涉及的用于 NS 以及 Data Queue 的主要函数介绍一下:

1, int sem_unlink(const char *name);

其中, Name 是一个字符串常量。需要记住,该字符串常量须以 / 开头。

该函数的作用就是 解除一个 NS 的名称链接。该 NS 的名称将从名称集合中删除。如果该 NS 仍在使用中,该函数则不会删除该它,直到所有使用该 NS 的进程都已结束,或已调用了 sem_close() 。在 sem_unlink 后接着调用 sem_open() ,如果设置了 oflag 参数含有 O_CREAT 标志,则系统将会创建具有相同名称的新 NS 。

2, sem_t sem_open(const char name, int oflag, mode_t mode, unsigned int value);

其中, Name 是一个字符串常量。需要记住,该字符串常量须以 / 开头。

Oflag 是一个整型变量,代表着 Option Flag ,它有两个取值: O_CREAT 和 O_EXCL 。前者表明如果某个 NS 的名称不存在,则创建具有相同名称的新信号量。后者则是企图独占式打开某个 NS 的名称,也就是当具有某个名称的 NS 已经存在,则会返回打开失败。

M ode 也是一个整型变量,代表着 Permission Flag 。程序里所使用的两个参数: S_IRUSR 和 S_IWUSR ,分别代表着允许这个 NS 的创建者以可读的方式和可写的方式打开这个信号量。

Vallue 还是一个整型变量,代表着这一 NS 的初始值。和 Unnamed Semaphore 一样:
当 Value > 0 时,它代表着可用的共享资源的数量。
当 Value < 0 时,其绝对值代表着因未能获得共享资源而正处于被阻塞状态的作业的数量。
而当 Value = 0 时,它代表既没有可用的共享资源,也没有想要获得共享资源而处于阻塞状态的作业。

3 , int sem_wait(sem_t * sem);

其中, sem 是一个指针,指向系统中一个已经被创建了的 NS 。

该函数的作用是将 NS 的 Value 减 1 。该函数执行后,如果 NS 的 Value 大于等于 0 ,那么当前作业将会继续执行其下一条语句;反之,如果 NS 的 Value 变成小于 0 ,那么当前作业将被阻塞,直到被唤醒,才会继续执行其下一条语句。

4 , int sema_post(sem_t * sem);

其中, sem 是一个指针,指向系统中一个已经被创建了的 NS 。

该函数的作用是将 NS 的 Value 加 1 。该函数执行前,如果 NS 的 Value 小于 0 ,那么该函数将会去随机地唤醒一个正在处于被阻塞状态的作业。

5 , int sem_close(sem_t * sem);

其中, sem 是一个指针,指向 系统中一个已经被创建了的 NS 。

该函数的作用是关闭和 释放与该 NS 相关的系统资源。在 NS 被关闭后再试图去使用它将导致错误。当一个 NS 不再被使用时,它应该被关闭。如果之前对该 NS 执行过 sem_unlink() ,并且当前作业持有对该 NS 的最后一个引用,该函数则会将这个 NS 从系统中彻底删除。

6 , int clear_dtaq(), int send_dtaq(char str[]), int receive_dtaq(void *straddr)

这是一系列对于 Data Queue 的操作,包括:
(1) 清除 Data Queue 中的内容;
(2) 向特定 Data Queue 发送一个字符串;
(3) 从特定 Data Queue 接收一个字符串;

它们分别涉及一个 PGM 和两个 API : QCLRDTAQ , QSNDDTAQ , QRCVDTAQ 。相关程序也显示出如何在 C 程序中调用 PGM 和 API 。

程序中的这一特定 Data Queue ,其实就是一个普通的 Data Queue , OLIVER/DTAQ01 。它可以通过 CRTDTAQ 这个命令来创建,比如:

CRTDTAQ DTAQ(OLIVER/DTAQ01) MAXLEN(100) SEQ(*FIFO)

好了,现在让我们看看程序的具体流程:

作业 A 运行 SENDR 程序:

  • 作业 A 首先通过 sem_unlink 初始化两个 NS 的名字, mysendr 和 myrecvr 。
  • 然后通过 sem_open 创建并打开这两个 NS , mysendr 和 myrecvr 。它们的初始值均为 0 。同时,作业 A 会清空程序中所使用的 Data Queue 。
  • 接下来,作业 A 将阻塞式等待 NS myrecvr 的值大于 0 。
  • 一旦系统的这个 NS myrecvr 的值大于 0 ,它将唤醒作业 A ,并将 NS myrecvr 的值减 1 。
  • 作业 A 它会接收键盘输入,并将输入的字符串组成一条 Message ,并通过 Data Queue 发送出这条 Message 。同时将 NS mysendr 的值加 1 ,以表明 Message 已经发送。
  • 然后,作业 A 继续阻塞式等待 NS myrecvr 的值大于 0 。
  • 如此循环,直到所发送出的 Message 是一个句点,循环结束。
  • 收尾工作主要是释放引用并关闭作业 A 所打开使用的那两个 NS , mysendr 和 myrecvr 。

作业 B 运行 RECVR 程序:

  • 它首先通过 sem_open 来打开使用系统中的两个 NS , mysendr 和 myrecvr ,它们的初始值均为 0 。
  • 作业 B 会将 NS myrecvr 的值加 1 ,以表明 receiver 已准备好接收 Message 。
  • 接下来,作业 B 将阻塞式等待 NS mysendr 的值大于 0 。
  • 一旦系统的这个 NS myrecvr 的值大于 0 ,它将唤醒作业 B ,并将 NS mysendr 的值减 1 。
  • 作业 B 它会接收 Data Queue 里的那条 Message ,并通过 printf 进行输出。同时将 NS myrecvr 的值加 1 ,以继续表明已准备好接收 Message 。
  • 然后,作业 B 继续阻塞式等待 NS mysendr 的值大于 0 。
  • 如此循环,直到所接收到的 Message 是一个句点,循环结束。
  • 收尾工作主要是释放引用和关闭作业 B 所打开使用的那两个 NS , mysendr 和 myrecvr 。

现在,让我们来运行一下。

  • 首先我们在作业 A 里 CALL PGM(OLIVER/SENDR) ,它会阻塞式等待在 sem_wait(sem_recvr) 这条语句里;
  • 然后我们在作业 B 里 CALL PGM(OLIVER/RECVR) ,它会通过 sem_post(sem_recvr) 唤醒作业 A ,并阻塞式等待在 sem_wait(sem_sendr) 这条语句里;
  • 接着,被唤醒的作业 A 会提示我们输入一个字串,以回车符结尾。它会向特定的 Data Queue 里发送我们所输入的字串,然后通过通过 sem_post(sem_sendr) 唤醒作业 B ,并重新阻塞式等待在 sem_wait(sem_recvr) 这条语句里,除非当前字串是一个句点,进而循环结束。
  • 被唤醒的作业 B 会从特定的 Data Queue 里接收作业 A 所发送的字串,然后将它显示在屏幕上,并通过 sem_post(sem_recvr) 唤醒作业 A ,并重新阻塞式等待在 sem_wait(sem_sendr) 这条语句里,除非所收到的字串是一个句点,进而循环结束。
  • 一旦循环结束,两个作业都会进行释放引用和关闭信号量的工作,进而程序结束。

程序运行起来的效果就是,我们在作业 A 中打入的字串,会被作业 A 和作业 B 同时显示在其屏幕上,直到我们键入了一个句点作为程序的终结。

总结一下,在 IBM i 上,作业之间进行信息交互的方式很多,可以是文件, Data Area , Message Queue ,甚至是 User Queue 和 User Space 。本文仅列举了通过 Data Queue 的方式来达成两个作业之间的信息交互。一旦有信息交互,就意味这资源共享;一旦有资源共享,就意味着需要协调多个作业对该共享资源的访问。文件有文件锁,对象有对象锁,当然,我们还可以像本文那样,采用 NS ,也就是 Named Semaphore ,来协调两个作业对共享资源( Data Queue )的访问,这其实也是在协调两个并发作业的运行。正如我们所看到的,创建和使用 NS 非常简单便捷,而且程序的可读性强,是一种非常值得推荐的方法。

参考网站:

1 , Knowledge Center – Interprocess Communication APIs

https://www.ibm.com/docs/en/i/7.3?topic=ssw_ibm_i_73/apis/unix3.htm

2 , Knowledge Center – Object APIs

https://www.ibm.com/docs/en/i/7.3?topic=category-object-apis

如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!

0

添加新评论0 条评论

Ctrl+Enter 发表

作者其他文章

相关文章

X社区推广