Moved Pstream/mpi into foam/, removed Pstream/

This commit is contained in:
Dominik Christ 2015-06-19 15:31:00 +01:00
parent f278a4f857
commit 4c23c3f9e8
25 changed files with 594 additions and 2177 deletions

View file

@ -17,9 +17,13 @@ wmakePrintBuild -check || /bin/rm -f foam/Make/$WM_OPTIONS/global.? 2>/dev/null
wmakeLnInclude foam
wmakeLnInclude meshTools
wmakeLnInclude OSspecific/$WM_OSTYPE
Pstream/Allwmake
wmake libo OSspecific/$WM_OSTYPE
set +x
echo
echo "Note: ignore spurious warnings about missing mpicxx.h headers"
set -x
wmake libso foam
# Decomposition methods needed by meshTools

View file

@ -1,22 +0,0 @@
#!/bin/sh
cd ${0%/*} || exit 1 # run from this directory
set -x
wmake libso dummy
case "$WM_MPLIB" in
*MPI* | MVAPICH*)
set +x
echo
echo "Note: ignore spurious warnings about missing mpicxx.h headers"
set -x
(WM_OPTIONS=${WM_OPTIONS}$WM_MPLIB; wmake libso mpi)
;;
#GAMMA)
# wmake libso gamma
# ;;
esac
# ----------------------------------------------------------------- end-of-file

View file

@ -1,101 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Read token and binary block from IPstream
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "IPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::IPstream::IPstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
Istream(format, version),
fromProcNo_(fromProcNo),
messageSize_(0)
{
notImplemented
(
"IPsream::IPstream"
"("
"const commsTypes,"
"const int fromProcNo,"
"const label bufSize,"
"streamFormat, versionNumber"
")"
);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
int Foam::IPstream::read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
)
{
notImplemented
(
"IPstream::read"
"("
"const commsTypes,"
"const int fromProcNo,"
"char* buf,"
"const label bufSize"
")"
);
return 0;
}
void Foam::IPstream::waitRequests()
{}
bool Foam::IPstream::finishedRequest(const label)
{
notImplemented("IPstream::finishedRequest()");
return false;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// ************************************************************************* //

View file

@ -1,5 +0,0 @@
Pstream.C
IPread.C
OPwrite.C
LIB = $(FOAM_LIBBIN)/dummy/libPstream

View file

@ -1,80 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Write primitive and binary block from OPstream
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "OPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::OPstream::~OPstream()
{
notImplemented("OPstream::~OPstream()");
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::OPstream::write
(
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
)
{
notImplemented
(
"IPstream::write"
"("
"const commsTypes commsType,"
"const int fromProcNo,"
"char* buf,"
"const label bufSize"
")"
);
return false;
}
void Foam::OPstream::waitRequests()
{}
bool Foam::OPstream::finishedRequest(const label)
{
notImplemented("OPstream::finishedRequest()");
return false;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// ************************************************************************* //

View file

@ -1,65 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Foam::Pstream::addValidParOptions(HashTable<string>& validParOptions)
{}
bool Foam::Pstream::init(int& argc, char**& argv)
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
<< "Trying to use the dummy Pstream library." << nl
<< "This dummy library cannot be used in parallel mode"
<< Foam::exit(FatalError);
return false;
}
void Foam::Pstream::exit(int errnum)
{
notImplemented("Pstream::exit(int errnum)");
}
void Foam::Pstream::abort()
{
notImplemented("Pstream::abort()");
}
void Foam::reduce(scalar&, const sumOp<scalar>&)
{}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// ************************************************************************* //

View file

@ -1,136 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Read token and binary block from IPstream
\*---------------------------------------------------------------------------*/
#include "IPstream.H"
#include "long.H"
#include "PstreamGlobals.H"
extern "C"
{
# include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
IPstream::IPstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
Istream(format, version),
fromProcNo_(fromProcNo),
messageSize_(0)
{
// Blocking read.
setOpened();
setGood();
if (Pstream::debug)
{
Pout<< "IPstream::IPstream : Starting receive from " << fromProcNo_
<< " recvIndex:" << PstreamGlobals::recvIndex[fromProcNo_]
<< Foam::endl;
}
PstreamGlobals::gammaWait(fromProcNo_);
label ready = PstreamGlobals::consumeIndex[fromProcNo_];
messageSize_ = PstreamGlobals::recvBufLen[ready][fromProcNo_];
if (!bufSize)
{
if (Pstream::debug)
{
Pout<< "IPstream::IPstream : sizing buffer to " << messageSize_
<< endl;
}
buf_.setSize(messageSize_);
}
PstreamGlobals::copyReceive(fromProcNo_, buf_.begin(), buf_.size());
if (Pstream::debug)
{
Pout<< "IPstream::IPstream : Received " << messageSize_
<< " from " << fromProcNo_
<< Foam::endl;
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
label IPstream::read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
)
{
// Blocking read.
label messageSize;
if (Pstream::debug)
{
Pout<< "IPstream::read : Starting receive from " << fromProcNo
<< " recvIndex:" << PstreamGlobals::recvIndex[fromProcNo]
<< Foam::endl;
}
PstreamGlobals::gammaWait(fromProcNo);
messageSize = PstreamGlobals::copyReceive(fromProcNo, buf, bufSize);
if (Pstream::debug)
{
Pout<< "IPstream::read : Received " << messageSize
<< " from " << fromProcNo
<< Foam::endl;
}
return messageSize;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View file

@ -1,6 +0,0 @@
PstreamGlobals.C
Pstream.C
OPwrite.C
IPread.C
LIB = $(FOAM_MPI_LIBBIN)/libPstream

View file

@ -1,4 +0,0 @@
include $(RULES)/mplib$(WM_MPLIB)
EXE_INC = $(PFLAGS) $(PINC)
LIB_LIBS = $(PLIBS)

View file

@ -1,185 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Write primitive and binary block from OPstream gamma-mpi
\*---------------------------------------------------------------------------*/
#include "OPstream.H"
#include "long.H"
#include "PstreamGlobals.H"
extern "C" {
#include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Largest message sent so far. This tracks the size of the receive
// buffer on the receiving end. Done so we only send out resize messages
// if necessary
//! @cond fileScope
labelList maxSendSize;
//! @endcond fileScope
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
OPstream::~OPstream()
{
if (Pstream::debug)
{
Pout<< "OPstream::~OPstream() to processor " << toProcNo_
<< Foam::endl;
}
if
(
!write
(
commsType_,
toProcNo_,
buf_.begin(),
bufPosition_
)
)
{
FatalErrorIn("OPstream::~OPstream()")
<< "GAMMA cannot send outgoing message"
<< Foam::abort(FatalError);
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool OPstream::write
(
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
)
{
if (PstreamGlobals::getSizeFromHeader(buf, bufSize) != -1)
{
FatalErrorIn("OPstream::write")
<< "Problem: Trying to send message of size " << bufSize
<< " that corresponds to the special resizeMessage."
<< Foam::abort(FatalError);
}
if (maxSendSize.empty())
{
// Intialize maxSendSize to the initial size of the receive buffers.
maxSendSize.setSize(Pstream::nProcs());
maxSendSize = PstreamGlobals::initialBufferLen;
maxSendSize[Pstream::myProcNo()] = 0;
if (Pstream::debug)
{
forAll(maxSendSize, procNo)
{
Pout<< "OPstream::write() : for toProcNo:" << procNo
<< " set maxSendSize to " << maxSendSize[procNo]
<< Foam::endl;
}
}
}
if (Pstream::debug)
{
Pout<< "OPstream::write() : proc:" << toProcNo
<< " maxSendSize:" << maxSendSize[toProcNo]
<< Foam::endl;
}
if (bufSize > maxSendSize[toProcNo])
{
// Send resize message.
if (Pstream::debug)
{
Pout<< "OPstream::write() : Sending resize message to proc "
<< toProcNo
<< " for size:" << bufSize
<< Foam::endl;
}
PstreamGlobals::setResizeMessage(bufSize);
gamma_send_flowctl
(
toProcNo,
reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
);
maxSendSize[toProcNo] = bufSize;
}
// Do normal send
// ~~~~~~~~~~~~~~
// Note: could be put into allocation of buf.
//gamma_mlock(const_cast<char*>(buf), bufSize);
if (Pstream::debug)
{
Pout<< "OPstream::write() : Sending to proc " << toProcNo
<< " bytes:" << bufSize << Foam::endl;
}
gamma_send_flowctl
(
toProcNo,
const_cast<char*>(buf),
bufSize
);
//gamma_munlock(const_cast<char*>(buf), bufSize);
if (Pstream::debug)
{
Pout<< "OPstream::write() : Sent " << bufSize
<< " to proc " << toProcNo
<< Foam::endl;
}
return true;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View file

@ -1,474 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Pstream for GAMMA
GAMMA has a (polling) receive handler which gets called every time a
received message is complete. Ours stores the length of the currently
received message and sets up the next buffer to store the next message
in.
Note that the pattern between two processors can be
- send
- receive
- receive
- send
since the first swap might belong to a local exchange and the second to
a reduce. Since gamma has to have the receive buffers already set up we
have to allocate them big enough. To prevent excessive amounts needed we
dynamically resize them (never shrink) by sending special 'resize' messages
before sending a largish message.
Because of this we actually need four receive buffers:
- send
- receive resize message
- receive normal message
- receive resize message
- receive normal message
- send
The special resize message is a message with a special header which
(hopefully) should never appear in normal exchanges (it actually checks
for this in the OPstream::send)
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "PstreamGlobals.H"
#include <cstring>
#include <cstdlib>
#include <csignal>
extern "C"
{
# include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// Receive handler to copy out received message length and switch buffers.
static void handler(void)
{
label current = PstreamGlobals::recvIndex[gamma_active_port];
List<char>& buf = PstreamGlobals::recvBuf[current][gamma_active_port];
label bufLen = PstreamGlobals::recvBufLen[current][gamma_active_port];
if (bufLen != -1)
{
FatalErrorIn("Pstream::handler(void)")
<< "Buffer length not reset : "
<< bufLen
<< " when receiving message of size " << gamma_msglen
<< " from processor " << gamma_active_port << endl
<< "This means that the existing data has not been consumed yet"
<< " (by IPstream::read) and means your communication pattern"
<< " is probably not balanced (a receive for every send)"
<< endl
<< "This can happen if you have e.g. gather without scatter."
<< endl
<< "A workaround is to increase the depth of the circular"
<< " receive buffers in PstreamGlobals.H"
<< abort(FatalError);
}
// Some checks
if
(
gamma_msglen < 0
|| gamma_msglen > buf.size()
)
{
FatalErrorIn("Pstream::handler(void)")
<< "Received message of size " << gamma_msglen
<< " from processor " << gamma_active_port
<< Foam::endl
<< "but global receive buffer is only of size "
<< buf.size()
<< abort(FatalError);
}
// Check for resize message
label resizeLen = PstreamGlobals::getSizeFromHeader
(
buf.begin(),
gamma_msglen
);
if (resizeLen != -1)
{
if (Pstream::debug)
{
Pout<< "Pstream::handler : Resize message:" << resizeLen
<< " from proc " << gamma_active_port
<< " current size:"
<< PstreamGlobals::getMaxBufSize(gamma_active_port)
<< Foam::endl;
}
// Saved current buffer.
List<char> savedBuf;
if (resizeLen > PstreamGlobals::getMaxBufSize(gamma_active_port))
{
if (Pstream::debug)
{
Pout<< "Pstream::handler :"
<< " resizing receive buffer for processor "
<< gamma_active_port
<< " from "
<< PstreamGlobals::getMaxBufSize(gamma_active_port)
<< " to " << resizeLen << Foam::endl;
}
// Save the pointer (that gamma knows about) so we can safely
// gamma_switch_to_buffer with a valid pointer.
// Not sure if nessecary but do anyway.
savedBuf.transfer(buf);
// Resize all the buffers
forAll(PstreamGlobals::recvBuf, i)
{
List<char>& chars =
PstreamGlobals::recvBuf[i][gamma_active_port];
// gamma_munlock(chars.begin(), chars.size());
chars.setSize(resizeLen);
// gamma_mlock(chars.begin(), chars.size());
}
}
// Update length with special value to denote resize was done.
PstreamGlobals::recvBufLen[current][gamma_active_port] = -2;
}
else
{
// Update length with actual message length
PstreamGlobals::recvBufLen[current][gamma_active_port] = gamma_msglen;
}
// Go to next buffer.
label next = PstreamGlobals::recvBuf.fcIndex(current);
PstreamGlobals::recvIndex[gamma_active_port] = next;
// gamma_switch_to_buffer
gamma_post_recv
(
gamma_active_port,
PstreamGlobals::recvBuf[next][gamma_active_port].begin(),
PstreamGlobals::recvBuf[next][gamma_active_port].size()
);
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Pstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("machinefile", "machine file");
validParOptions.insert("GAMMANP", "numProcs");
validParOptions.insert("GAMMAHOME", "gamma cwd");
validParOptions.insert("GAMMA", "1(enable) or 0(disable)");
}
bool Pstream::init(int& argc, char**& argv)
{
int numprocs = 0;
string npString("-GAMMANP");
for (label i = 0; i < argc; i++)
{
if (argv[i] == npString)
{
if (i+1 < argc)
{
numprocs = atoi(argv[i+1]);
break;
}
}
}
// Initialize GAMMA
unsigned char smallNumprocs = numprocs;
gamma_init(smallNumprocs, argc, argv);
myProcNo_ = gamma_my_node();
// Make sure printing with prefix.
setParRun();
procIDs_.setSize(numprocs);
forAll(procIDs_, procNo)
{
procIDs_[procNo] = procNo;
}
// Allocate receive buffers.
// ~~~~~~~~~~~~~~~~~~~~~~~~~
// Make sure each receive buffer is at least large enough to receive
// the resize message.
// Current active buffer
PstreamGlobals::recvIndex.setSize(numprocs);
PstreamGlobals::recvIndex = 0;
PstreamGlobals::consumeIndex.setSize(numprocs);
PstreamGlobals::consumeIndex = 0;
forAll(PstreamGlobals::recvBuf, i)
{
PstreamGlobals::recvBufLen[i].setSize(numprocs);
PstreamGlobals::recvBufLen[i] = -1;
List<List<char> >& buffers = PstreamGlobals::recvBuf[i];
buffers.setSize(numprocs);
forAll(buffers, procNo)
{
if (procNo != myProcNo_)
{
buffers[procNo].setSize(PstreamGlobals::initialBufferLen);
// Acc. to gamma sources all buffers need to be in memory.
// Either locked or "write touched".
// gamma_mlock
// (
// buffers[procNo].begin(),
// buffers[procNo].size()
// );
}
}
}
// Lock the special resize message
// gamma_mlock
// (
// reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
// PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
// );
// Attach current receive buffers
forAll(procIDs_, procNo)
{
if (procNo != myProcNo_)
{
// Buffer index (always 0 at this point)
label current = PstreamGlobals::recvIndex[procNo];
// Current buffer for this processor.
List<char>& buf = PstreamGlobals::recvBuf[current][procNo];
gamma_set_active_port
(
procNo, //unsigned short port,
procNo, //unsigned short dest_node,
gamma_my_par_pid(), //unsigned char dest_par_pid,
myProcNo_, //unsigned short dest_port,
handler, //callback
procNo, //unsigned short semaphore,
GO_BACK, //unsigned char buffer_kind,
buf.begin(),
buf.size()
);
}
}
// Make sure all have allocated the ports (so set the receive buffers)
gamma_sync();
Info<< "GAMMA Pstream initialized with:" << nl
<< " floatTransfer : " << floatTransfer << nl
<< " nProcsSimpleSum : " << nProcsSimpleSum() << nl
<< " scheduledTransfer : " << Pstream::scheduledTransfer << nl
<< Foam::endl;
// Now that nprocs is known construct communication tables.
initCommunicationSchedule();
return true;
}
void Pstream::exit(int errnum)
{
// gamma_munlockall();
gamma_exit();
//gamma_abort();
}
void Pstream::abort()
{
Pout<< "**Pstream::abort()**" << endl;
// gamma_munlockall();
gamma_abort();
}
void reduce(scalar& Value, const sumOp<scalar>& bop)
{
if (!Pstream::parRun())
{
return;
}
if (Pstream::debug)
{
Pout<< "**entering Pstream::reduce for " << Value << Foam::endl;
}
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
scalar value;
if
(
!IPstream::read
(
slave,
reinterpret_cast<char*>(&value), // buf
sizeof(Value) // bufSize
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "IPstream::read failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
!OPstream::write
(
Pstream::masterNo(),
reinterpret_cast<const char*>(&Value), // buf
sizeof(Value), // bufSize
false // non-buffered
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "OPstream::write failed"
<< Foam::abort(FatalError);
}
}
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
if
(
!OPstream::write
(
slave,
reinterpret_cast<const char*>(&Value), // buf
sizeof(Value), // bufSize,
false // non-buffered
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "OPstream::write failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
!IPstream::read
(
Pstream::masterNo(),
reinterpret_cast<char*>(&Value), // buf
sizeof(Value) // bufSize
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "IPstream::read failed"
<< Foam::abort(FatalError);
}
}
if (Pstream::debug)
{
Pout<< "**exiting Pstream::reduce with " << Value << Foam::endl;
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View file

@ -1,206 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "PstreamGlobals.H"
#include "IOstreams.H"
#include "Pstream.H"
extern "C" {
#include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
// Receive buffers
FixedList<List<List<char> >, 4> PstreamGlobals::recvBuf;
// Length of receive buffers
FixedList<labelList, 4> PstreamGlobals::recvBufLen;
labelList PstreamGlobals::recvIndex;
labelList PstreamGlobals::consumeIndex;
// These are all signalling nans and probably different from the ones that
// the fpu might ever generate.
uint64_t PstreamGlobals::resizeMessage[PstreamGlobals::resizeMessageLen] =
{
0x7ff7ffffffffffABllu,
0x7ff7ffffffffffCDllu,
0x7ff7ffffffffff12llu,
0x7ff7ffffffffff30llu,
0x7ff7ffffffffff19llu,
0x0000000000000000llu // this word gets overwritten with the length.
};
// Wrapper around gamma_wait
void PstreamGlobals::gammaWait(const label procNo)
{
// Last request. Block.
gamma_wait(procNo, 1);
// Currently unconsumed received message
label ready = PstreamGlobals::consumeIndex[procNo];
// Check received length
if (PstreamGlobals::recvBufLen[ready][procNo] == -2)
{
// Was resize message. Consume and rewait (is always followed by
// real message)
if (Pstream::debug)
{
Pout<< "PstreamGlobals::gammaWait : "
<< "Resize event. consumeIndex:" << ready
<< " Restarting receive from " << procNo << endl;
}
// Consume resize message
PstreamGlobals::recvBufLen[ready][procNo] = -1;
PstreamGlobals::consumeIndex[procNo] =
PstreamGlobals::recvBuf.fcIndex(ready);
// And rewait
gamma_wait(procNo, 1);
}
}
// Copies data from global receive buffer into buf.
label PstreamGlobals::copyReceive
(
const label procNo,
char* buf,
const label bufSize
)
{
// Get the ready buffer
label ready = consumeIndex[procNo];
// Actually received
label receivedLen = recvBufLen[ready][procNo];
if (Pstream::debug)
{
Pout<< "copyReceive : for proc " << procNo
<< " copying " << receivedLen << " bytes out of buffer " << ready
<< endl;
}
if (receivedLen < 0)
{
FatalErrorIn
(
"Pstream::copyReceive(const label, char*, const label)"
) << "Illegal message length "
<< receivedLen
<< " received from proc " << procNo << " into buffer " << ready
<< endl
<< "This is probably caused by receiving more than is actually"
<< " sent (e.g. gather without scatter)." << endl
<< abort(FatalError);
}
if (receivedLen > bufSize)
{
FatalErrorIn
(
"Pstream::copyReceive(const label, char*, const label)"
) << "buffer ("
<< bufSize
<< ") not large enough for incomming message ("
<< receivedLen << ')'
<< " received from proc " << procNo << " into buffer " << ready
<< abort(FatalError);
}
// Copy out of receive buffer
memcpy
(
buf,
recvBuf[ready][procNo].begin(),
receivedLen
);
// Release receive buffer
recvBufLen[ready][procNo] = -1;
// Go to next buffer to consume
consumeIndex[procNo] = recvBuf.fcIndex(ready);
return receivedLen;
}
// Checks whether an incoming message is a resize message. If not returns -1,
// otherwise returns size read from header.
label PstreamGlobals::getSizeFromHeader(const char* buf, const label len)
{
if (len != resizeMessageLen*sizeof(uint64_t))
{
return -1;
}
const uint64_t* dPtr = reinterpret_cast<const uint64_t*>(buf);
// Check all but the last word
for (label i = 0; i < resizeMessageLen-1; i++)
{
if (*dPtr++ != resizeMessage[i])
{
return -1;
}
}
return *reinterpret_cast<const label*>(dPtr);
}
void PstreamGlobals::setResizeMessage(const label len)
{
reinterpret_cast<label&>(resizeMessage[resizeMessageLen-1]) = len;
}
label PstreamGlobals::getMaxBufSize(const int procNo)
{
label maxSz = 0;
forAll(recvBuf, i)
{
maxSz = max(maxSz, recvBuf[i][procNo].size());
}
return maxSz;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View file

@ -1,105 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Namespace
Foam::PstreamGlobals
Description
Global functions and variables for working with parallel streams,
but principally for gamma/mpi
SourceFiles
PstreamGlobals.C
\*---------------------------------------------------------------------------*/
#ifndef PstreamGlobals_H
#define PstreamGlobals_H
#include "FixedList.H"
#include "labelList.H"
#include "DynamicList.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
/*---------------------------------------------------------------------------*\
Class PstreamGlobals Declaration
\*---------------------------------------------------------------------------*/
namespace PstreamGlobals
{
//- Block wait for message on port procNo
void gammaWait(const label procNo);
//- Helper routine to copy out newly received data
label copyReceive
(
const label procNo,
char* buf,
const label bufSize
);
//- Receive buffers
extern FixedList<List<List<char> >, 4> recvBuf;
//- Length of receive buffers
extern FixedList<labelList, 4> recvBufLen;
//- Currently active buffer in receiving
extern labelList recvIndex;
//- Receive buffer that has to be consumed
extern labelList consumeIndex;
//- Special message to signal resizing
const int resizeMessageLen = 6;
extern uint64_t resizeMessage[];
//- Initial buffer length. Should be able to contain the message comfortably.
const int initialBufferLen = 2*resizeMessageLen*sizeof(uint64_t);
//- Helper routine to check if a message is a resize message.
// Returns -1 if not or the new size.
label getSizeFromHeader(const char* buf, const label len);
//- Change the resize message to contain the new length
void setResizeMessage(const label len);
//- Get max size of all receive buffers to procNo
label getMaxBufSize(const int procNo);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View file

@ -1,235 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Read token and binary block from IPstream
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "IPstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::IPstream::IPstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
Istream(format, version),
fromProcNo_(fromProcNo),
messageSize_(0)
{
setOpened();
setGood();
MPI_Status status;
// If the buffer size is not specified, probe the incoming message
// and set it
if (!bufSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
buf_.setSize(messageSize_);
}
messageSize_ = read(commsType, fromProcNo_, buf_.begin(), buf_.size());
if (!messageSize_)
{
FatalErrorIn
(
"IPstream::IPstream(const int fromProcNo, "
"const label bufSize, streamFormat format, versionNumber version)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::IPstream::read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
)
{
if (commsType == blocking || commsType == scheduled)
{
MPI_Status status;
if
(
MPI_Recv
(
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
msgType(),
MPI_COMM_WORLD,
&status
)
)
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "MPI_Recv cannot receive incoming message"
<< Foam::abort(FatalError);
return 0;
}
// Check size of message read
label messageSize;
MPI_Get_count(&status, MPI_BYTE, &messageSize);
if (messageSize > bufSize)
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "buffer (" << label(bufSize)
<< ") not large enough for incoming message ("
<< messageSize << ')'
<< Foam::abort(FatalError);
}
return messageSize;
}
else if (commsType == nonBlocking)
{
MPI_Request request;
if
(
MPI_Irecv
(
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
msgType(),
MPI_COMM_WORLD,
&request
)
)
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "MPI_Recv cannot start non-blocking receive"
<< Foam::abort(FatalError);
return 0;
}
PstreamGlobals::IPstream_outstandingRequests_.append(request);
return 1;
}
else
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "Unsupported communications type " << commsType
<< Foam::abort(FatalError);
return 0;
}
}
void Foam::IPstream::waitRequests()
{
if (PstreamGlobals::IPstream_outstandingRequests_.size())
{
if
(
MPI_Waitall
(
PstreamGlobals::IPstream_outstandingRequests_.size(),
PstreamGlobals::IPstream_outstandingRequests_.begin(),
MPI_STATUSES_IGNORE
)
)
{
FatalErrorIn
(
"IPstream::waitRequests()"
) << "MPI_Waitall returned with error" << endl;
}
PstreamGlobals::IPstream_outstandingRequests_.clear();
}
}
bool Foam::IPstream::finishedRequest(const label i)
{
if (i >= PstreamGlobals::IPstream_outstandingRequests_.size())
{
FatalErrorIn
(
"IPstream::finishedRequest(const label)"
) << "There are "
<< PstreamGlobals::IPstream_outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::IPstream_outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
return flag != 0;
}
// ************************************************************************* //

View file

@ -1,6 +0,0 @@
OPwrite.C
IPread.C
Pstream.C
PstreamGlobals.C
LIB = $(FOAM_MPI_LIBBIN)/libPstream

View file

@ -1,4 +0,0 @@
include $(RULES)/mplib$(WM_MPLIB)
EXE_INC = $(PFLAGS) $(PINC)
LIB_LIBS = $(PLIBS)

View file

@ -1,428 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include <cstring>
#include <cstdlib>
#include <csignal>
#if defined(WM_SP)
# define MPI_SCALAR MPI_FLOAT
#elif defined(WM_DP)
# define MPI_SCALAR MPI_DOUBLE
#elif defined(WM_LDP)
# define MPI_SCALAR MPI_LONG_DOUBLE
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::Pstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("GAMMANP", "number of instances");
validParOptions.insert("machinefile", "machine file");
}
bool Foam::Pstream::init(int& argc, char**& argv)
{
MPI_Init(&argc, &argv);
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_);
if (numprocs <= 1)
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
<< "bool Pstream::init(int& argc, char**& argv) : "
"attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
procIDs_.setSize(numprocs);
forAll(procIDs_, procNo)
{
procIDs_[procNo] = procNo;
}
setParRun();
# ifndef SGIMPI
string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
if (bufferSizeName.size())
{
int bufferSize = atoi(bufferSizeName.c_str());
if (bufferSize)
{
MPI_Buffer_attach(new char[bufferSize], bufferSize);
}
}
else
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
<< "Pstream::init(int& argc, char**& argv) : "
<< "environment variable MPI_BUFFER_SIZE not defined"
<< Foam::abort(FatalError);
}
# endif
int processorNameLen;
char processorName[MPI_MAX_PROCESSOR_NAME];
MPI_Get_processor_name(processorName, &processorNameLen);
//signal(SIGABRT, stop);
// Now that nprocs is known construct communication tables.
initCommunicationSchedule();
return true;
}
void Foam::Pstream::exit(int errnum)
{
# ifndef SGIMPI
int size;
char* buff;
MPI_Buffer_detach(&buff, &size);
delete[] buff;
# endif
if (errnum == 0)
{
MPI_Finalize();
::exit(errnum);
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
void Foam::Pstream::abort()
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
{
if (!Pstream::parRun())
{
return;
}
if (Pstream::nProcs() <= Pstream::nProcsSimpleSum())
{
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
scalar value;
if
(
MPI_Recv
(
&value,
1,
MPI_SCALAR,
Pstream::procID(slave),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(Pstream::masterNo()),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(slave),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
MPI_Recv
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(Pstream::masterNo()),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
}
else
{
scalar sum;
MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
Value = sum;
/*
int myProcNo = Pstream::myProcNo();
int nProcs = Pstream::nProcs();
//
// receive from children
//
int level = 1;
int thisLevelOffset = 2;
int childLevelOffset = thisLevelOffset/2;
int childProcId = 0;
while
(
(childLevelOffset < nProcs)
&& (myProcNo % thisLevelOffset) == 0
)
{
childProcId = myProcNo + childLevelOffset;
scalar value;
if (childProcId < nProcs)
{
if
(
MPI_Recv
(
&value,
1,
MPI_SCALAR,
Pstream::procID(childProcId),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
level++;
thisLevelOffset <<= 1;
childLevelOffset = thisLevelOffset/2;
}
//
// send and receive from parent
//
if (!Pstream::master())
{
int parentId = myProcNo - (myProcNo % thisLevelOffset);
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(parentId),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
if
(
MPI_Recv
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(parentId),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
//
// distribute to my children
//
level--;
thisLevelOffset >>= 1;
childLevelOffset = thisLevelOffset/2;
while (level > 0)
{
childProcId = myProcNo + childLevelOffset;
if (childProcId < nProcs)
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(childProcId),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
level--;
thisLevelOffset >>= 1;
childLevelOffset = thisLevelOffset/2;
}
*/
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// ************************************************************************* //

View file

@ -141,8 +141,11 @@ $(StringStreams)/StringStreamsPrint.C
Pstreams = $(Streams)/Pstreams
$(Pstreams)/Pstream.C
$(Pstreams)/PstreamCommsStruct.C
$(Pstreams)/PstreamGlobals.C
$(Pstreams)/IPstream.C
$(Pstreams)/OPstream.C
$(Pstreams)/IPread.C
$(Pstreams)/OPwrite.C
$(Pstreams)/PstreamsPrint.C
dictionary = db/dictionary

View file

@ -1,9 +1,10 @@
EXE_INC = \
include $(RULES)/mplib$(WM_MPLIB)
EXE_INC = $(PFLAGS) $(PINC)\
-I$(WM_THIRD_PARTY_DIR)/zlib-1.2.3
LIB_LIBS = \
LIB_LIBS = $(PLIBS)\
$(FOAM_LIBBIN)/libOSspecific.o \
-L$(FOAM_LIBBIN)/dummy -lPstream \
-lz
$(OBJECTS_DIR)/global.o: FORCE

View file

@ -1,9 +1,9 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 3.2
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
\\ / O peration |
\\ / A nd | For copyright notice see file Copyright
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
@ -26,136 +26,212 @@ Description
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "IPstream.H"
#include "error.H"
#include "int.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
namespace Foam
Foam::IPstream::IPstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
Istream(format, version),
fromProcNo_(fromProcNo),
messageSize_(0)
{
setOpened();
setGood();
// * * * * * * * * * * * * * Private member functions * * * * * * * * * * * //
MPI_Status status;
inline void IPstream::checkEof()
{
if (bufPosition_ == messageSize_)
// If the buffer size is not specified, probe the incomming message
// and set it
if (!bufSize)
{
setEof();
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
buf_.setSize(messageSize_);
}
messageSize_ = read(commsType, fromProcNo_, buf_.begin(), buf_.size());
if (!messageSize_)
{
FatalErrorIn
(
"IPstream::IPstream(const int fromProcNo, "
"const label bufSize, streamFormat format, versionNumber version)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
template<class T>
inline void IPstream::readFromBuffer(T& t)
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::IPstream::read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
)
{
const size_t align = sizeof(T);
bufPosition_ = align + ((bufPosition_ - 1) & ~(align - 1));
t = reinterpret_cast<T&>(buf_[bufPosition_]);
bufPosition_ += sizeof(T);
checkEof();
}
inline void IPstream::readFromBuffer(void* data, size_t count, size_t align)
{
if (align > 1)
if (commsType == blocking || commsType == scheduled)
{
bufPosition_ = align + ((bufPosition_ - 1) & ~(align - 1));
MPI_Status status;
if
(
MPI_Recv
(
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
msgType(),
MPI_COMM_WORLD,
&status
)
)
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "MPI_Recv cannot receive incomming message"
<< Foam::abort(FatalError);
return 0;
}
// Check size of message read
label messageSize;
MPI_Get_count(&status, MPI_BYTE, &messageSize);
if (messageSize > bufSize)
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "buffer (" << label(bufSize)
<< ") not large enough for incomming message ("
<< messageSize << ')'
<< Foam::abort(FatalError);
}
return messageSize;
}
register const char* bufPtr = &buf_[bufPosition_];
register char* dataPtr = reinterpret_cast<char*>(data);
register size_t i = count;
while (i--) *dataPtr++ = *bufPtr++;
bufPosition_ += count;
checkEof();
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
IPstream::~IPstream()
{}
Istream& IPstream::read(char& c)
{
c = buf_[bufPosition_];
bufPosition_++;
checkEof();
return *this;
}
Istream& IPstream::read(word& w)
{
size_t ws;
readFromBuffer(ws);
w = &buf_[bufPosition_];
bufPosition_ += ws + 1;
checkEof();
return *this;
}
Istream& IPstream::read(string& s)
{
size_t ss;
readFromBuffer(ss);
s = &buf_[bufPosition_];
bufPosition_ += ss + 1;
checkEof();
return *this;
}
Istream& IPstream::read(label& l)
{
readFromBuffer(l);
return *this;
}
Istream& IPstream::read(floatScalar& s)
{
readFromBuffer(s);
return *this;
}
Istream& IPstream::read(doubleScalar& s)
{
readFromBuffer(s);
return *this;
}
Istream& IPstream::read(char* data, std::streamsize count)
{
if (format() != BINARY)
else if (commsType == nonBlocking)
{
FatalErrorIn("IPstream::read(char*, std::streamsize)")
<< "stream format not binary"
MPI_Request request;
if
(
MPI_Irecv
(
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
msgType(),
MPI_COMM_WORLD,
&request
)
)
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "MPI_Recv cannot start non-blocking receive"
<< Foam::abort(FatalError);
return 0;
}
PstreamGlobals::IPstream_outstandingRequests_.append(request);
return 1;
}
else
{
FatalErrorIn
(
"IPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "Unsupported communications type " << commsType
<< Foam::abort(FatalError);
return 0;
}
}
void Foam::IPstream::waitRequests()
{
if (PstreamGlobals::IPstream_outstandingRequests_.size())
{
if
(
MPI_Waitall
(
PstreamGlobals::IPstream_outstandingRequests_.size(),
PstreamGlobals::IPstream_outstandingRequests_.begin(),
MPI_STATUSES_IGNORE
)
)
{
FatalErrorIn
(
"IPstream::waitRequests()"
) << "MPI_Waitall returned with error" << endl;
}
PstreamGlobals::IPstream_outstandingRequests_.clear();
}
}
bool Foam::IPstream::finishedRequest(const label i)
{
if (i >= PstreamGlobals::IPstream_outstandingRequests_.size())
{
FatalErrorIn
(
"IPstream::finishedRequest(const label)"
) << "There are "
<< PstreamGlobals::IPstream_outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
readFromBuffer(data, count, 8);
return *this;
}
int flag;
MPI_Test
(
&PstreamGlobals::IPstream_outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
Istream& IPstream::rewind()
{
bufPosition_ = 0;
return *this;
return flag != 0;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View file

@ -23,9 +23,27 @@ License
\*---------------------------------------------------------------------------*/
#include <cstring>
#include <cstdlib>
#include <csignal>
#include "mpi.h"
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "debug.H"
#include "dictionary.H"
#include "OSspecific.H"
#if defined(WM_SP)
# define MPI_SCALAR MPI_FLOAT
#elif defined(WM_DP)
# define MPI_SCALAR MPI_DOUBLE
#elif defined(WM_LDP)
# define MPI_SCALAR MPI_LONG_DOUBLE
#endif
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
@ -204,6 +222,383 @@ void Foam::Pstream::initCommunicationSchedule()
}
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::Pstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("GAMMANP", "number of instances");
validParOptions.insert("machinefile", "machine file");
}
bool Foam::Pstream::init(int& argc, char**& argv)
{
MPI_Init(&argc, &argv);
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_);
if (numprocs <= 1)
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
<< "bool Pstream::init(int& argc, char**& argv) : "
"attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
procIDs_.setSize(numprocs);
forAll(procIDs_, procNo)
{
procIDs_[procNo] = procNo;
}
setParRun();
# ifndef SGIMPI
string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
if (bufferSizeName.size())
{
int bufferSize = atoi(bufferSizeName.c_str());
if (bufferSize)
{
MPI_Buffer_attach(new char[bufferSize], bufferSize);
}
}
else
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
<< "Pstream::init(int& argc, char**& argv) : "
<< "environment variable MPI_BUFFER_SIZE not defined"
<< Foam::abort(FatalError);
}
# endif
int processorNameLen;
char processorName[MPI_MAX_PROCESSOR_NAME];
MPI_Get_processor_name(processorName, &processorNameLen);
//signal(SIGABRT, stop);
// Now that nprocs is known construct communication tables.
initCommunicationSchedule();
return true;
}
void Foam::Pstream::exit(int errnum)
{
# ifndef SGIMPI
int size;
char* buff;
MPI_Buffer_detach(&buff, &size);
delete[] buff;
# endif
if (errnum == 0)
{
MPI_Finalize();
::exit(errnum);
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
void Foam::Pstream::abort()
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
{
if (!Pstream::parRun())
{
return;
}
if (Pstream::nProcs() <= Pstream::nProcsSimpleSum())
{
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
scalar value;
if
(
MPI_Recv
(
&value,
1,
MPI_SCALAR,
Pstream::procID(slave),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(Pstream::masterNo()),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(slave),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
MPI_Recv
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(Pstream::masterNo()),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
}
else
{
scalar sum;
MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
Value = sum;
/*
int myProcNo = Pstream::myProcNo();
int nProcs = Pstream::nProcs();
//
// receive from children
//
int level = 1;
int thisLevelOffset = 2;
int childLevelOffset = thisLevelOffset/2;
int childProcId = 0;
while
(
(childLevelOffset < nProcs)
&& (myProcNo % thisLevelOffset) == 0
)
{
childProcId = myProcNo + childLevelOffset;
scalar value;
if (childProcId < nProcs)
{
if
(
MPI_Recv
(
&value,
1,
MPI_SCALAR,
Pstream::procID(childProcId),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
level++;
thisLevelOffset <<= 1;
childLevelOffset = thisLevelOffset/2;
}
//
// send and receive from parent
//
if (!Pstream::master())
{
int parentId = myProcNo - (myProcNo % thisLevelOffset);
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(parentId),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
if
(
MPI_Recv
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(parentId),
Pstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
//
// distribute to my children
//
level--;
thisLevelOffset >>= 1;
childLevelOffset = thisLevelOffset/2;
while (level > 0)
{
childProcId = myProcNo + childLevelOffset;
if (childProcId < nProcs)
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
Pstream::procID(childProcId),
Pstream::msgType(),
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
level--;
thisLevelOffset >>= 1;
childLevelOffset = thisLevelOffset/2;
}
*/
}
}
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Initialise my process number to 0 (the master)